/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.odps.tunnel.io;

import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.commons.transport.Connection;
import com.aliyun.odps.commons.transport.Response;
import com.aliyun.odps.commons.util.ArrowUtils;
import com.aliyun.odps.data.ArrowRecordReader;
import com.aliyun.odps.rest.ResourceBuilder;
import com.aliyun.odps.rest.RestClient;
import com.aliyun.odps.tunnel.Configuration;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.io.ArrowHttpInputStream;
import com.aliyun.odps.tunnel.io.CompressOption;
import com.aliyun.odps.utils.StringUtils;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.message.ArrowMessage;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.ipc.message.MessageChannelReader;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;

public class ArrowTunnelRecordReader
implements ArrowRecordReader {
    private RestClient tunnelServiceClient;
    private long start = 0L;
    private long count = 0L;
    private List<Column> columnList;
    private TableTunnel.DownloadSession tableSession;
    private boolean isClosed;
    private ArrowHttpInputStream inputStream;
    private Connection connection;
    private BufferAllocator allocator;
    private MessageChannelReader messageReader;
    private Schema arrowSchema;
    private CompressOption compression;

    public ArrowTunnelRecordReader(long start, long count, List<Column> columns, RestClient tunnelRestClient, TableTunnel.DownloadSession session, BufferAllocator allocator, CompressOption compress) throws TunnelException, IOException {
        this.start = start;
        this.count = count;
        this.columnList = columns;
        this.allocator = allocator == null ? new RootAllocator(Long.MAX_VALUE) : allocator;
        this.tunnelServiceClient = tunnelRestClient;
        this.tableSession = session;
        this.isClosed = false;
        this.arrowSchema = ArrowUtils.tableSchemaToArrowSchema(session.getSchema(), columns);
        this.compression = compress;
        this.openReaderConnection(this.start, this.count, this.columnList, this.tunnelServiceClient, this.tableSession);
    }

    private ArrowRecordBatch readBatch() throws IOException {
        ArrowMessage deserializeMessageBatch;
        if (this.isClosed) {
            throw new IOException("Arrow reader is closed");
        }
        if (this.inputStream == null) {
            this.inputStream = new ArrowHttpInputStream(this.connection.getInputStream(), this.compression);
            this.messageReader = new MessageChannelReader(new ReadChannel((ReadableByteChannel)this.inputStream), this.allocator);
        }
        return (deserializeMessageBatch = MessageSerializer.deserializeMessageBatch((MessageChannelReader)this.messageReader)) == null ? null : (ArrowRecordBatch)deserializeMessageBatch;
    }

    @Override
    public VectorSchemaRoot read() throws IOException {
        ArrayList<FieldVector> vectors = new ArrayList<FieldVector>();
        for (Field field : this.arrowSchema.getFields()) {
            vectors.add(field.createVector(this.allocator));
        }
        VectorSchemaRoot root = new VectorSchemaRoot(this.arrowSchema, vectors, 0);
        VectorLoader loader = new VectorLoader(root);
        ArrowRecordBatch recordBatch = this.readBatch();
        if (recordBatch == null) {
            return null;
        }
        loader.load(recordBatch);
        recordBatch.close();
        return root;
    }

    @Override
    public long bytesRead() {
        if (this.messageReader != null) {
            return this.messageReader.bytesRead();
        }
        return 0L;
    }

    @Override
    public void close() throws IOException {
        if (!this.isClosed) {
            if (this.inputStream != null) {
                this.inputStream.close();
            }
            this.connection.disconnect();
            this.isClosed = true;
        }
    }

    private void openReaderConnection(long start, long count, List<Column> columns, RestClient restClient, TableTunnel.DownloadSession session) throws IOException, TunnelException {
        HashMap<String, String> params = new HashMap<String, String>();
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("Content-Length", String.valueOf(0));
        headers.put("x-odps-tunnel-version", String.valueOf(5));
        switch (this.compression.algorithm) {
            case ODPS_RAW: {
                break;
            }
            case ODPS_ZLIB: {
                headers.put("Accept-Encoding", "deflate");
                break;
            }
            case ODPS_SNAPPY: {
                headers.put("Accept-Encoding", "x-snappy-framed");
                break;
            }
            case ODPS_ARROW_LZ4_FRAME: {
                headers.put("Accept-Encoding", "x-odps-lz4-frame");
                break;
            }
            default: {
                throw new TunnelException("invalid compression option.");
            }
        }
        if (columns != null && columns.size() != 0) {
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < columns.size(); ++i) {
                sb.append(columns.get(i).getName());
                if (i == columns.size() - 1) continue;
                sb.append(",");
            }
            params.put("columns", sb.toString());
        }
        params.put("downloadid", session.getId());
        params.put("data", null);
        params.put("rowrange", "(" + start + "," + count + ")");
        String partitionSpec = session.getPartitionSpec();
        if (partitionSpec != null && partitionSpec.length() > 0) {
            params.put("partition", partitionSpec);
        }
        params.put("arrow", "");
        Configuration conf = this.tableSession.getConfig();
        if (!StringUtils.isNullOrEmpty((String)conf.getQuotaName())) {
            params.put("quotaName", conf.getQuotaName());
        }
        Connection conn = null;
        try {
            String resource = ResourceBuilder.buildTableResource(session.getProjectName(), session.getSchemaName(), session.getTableName());
            conn = restClient.connect(resource, "GET", params, headers);
            Response resp = conn.getResponse();
            if (!resp.isOK()) {
                TunnelException err = new TunnelException(conn.getInputStream());
                err.setRequestId(resp.getHeader("x-odps-request-id"));
                throw err;
            }
            CompressOption reply_compression = null;
            String content_encoding = resp.getHeader("Content-Encoding");
            if (content_encoding != null) {
                if (content_encoding.equals("deflate")) {
                    reply_compression = new CompressOption(CompressOption.CompressAlgorithm.ODPS_ZLIB, -1, 0);
                } else if (content_encoding.equals("x-snappy-framed")) {
                    reply_compression = new CompressOption(CompressOption.CompressAlgorithm.ODPS_SNAPPY, -1, 0);
                } else if (content_encoding.equals("x-odps-lz4-frame")) {
                    reply_compression = new CompressOption(CompressOption.CompressAlgorithm.ODPS_ARROW_LZ4_FRAME, -1, 0);
                } else {
                    throw new TunnelException("invalid content encoding");
                }
            }
            this.compression = reply_compression;
            this.connection = conn;
        }
        catch (IOException e) {
            if (this.connection != null) {
                this.connection.disconnect();
            }
            throw new TunnelException(e.getMessage(), e);
        }
        catch (TunnelException e) {
            throw e;
        }
        catch (OdpsException e) {
            if (this.connection != null) {
                this.connection.disconnect();
            }
            throw new TunnelException(e.getMessage(), e);
        }
    }
}

