/*
 * Decompiled with CFR 0.152.
 */
package org.apache.impala.planner;

import com.google.common.base.Preconditions;
import java.util.List;
import org.apache.impala.analysis.Expr;
import org.apache.impala.planner.DataPartition;
import org.apache.impala.planner.DataSink;
import org.apache.impala.planner.ExchangeNode;
import org.apache.impala.planner.PlanNode;
import org.apache.impala.planner.ProcessingCost;
import org.apache.impala.planner.ResourceProfile;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TDataSink;
import org.apache.impala.thrift.TDataSinkType;
import org.apache.impala.thrift.TDataStreamSink;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TQueryOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataStreamSink
extends DataSink {
    private static final Logger LOG = LoggerFactory.getLogger(DataStreamSink.class);
    private static final double COST_COEFFICIENT_MERGING_XCHG_SNDR_ROWS = 0.1047;
    private static final double COST_COEFFICIENT_MERGING_XCHG_SNDR_BYTES = 0.0262;
    private static final double COST_COEFFICIENT_BCAST_XCHG_SNDR_BYTES = 0.0027;
    private static final double COST_COEFFICIENT_PART_XCHG_SNDR_BYTES = 0.0644;
    private final ExchangeNode exchNode_;
    private final DataPartition outputPartition_;

    public DataStreamSink(ExchangeNode exchNode, DataPartition partition) {
        Preconditions.checkNotNull((Object)exchNode);
        Preconditions.checkNotNull((Object)partition);
        this.exchNode_ = exchNode;
        this.outputPartition_ = partition;
    }

    @Override
    public void appendSinkExplainString(String prefix, String detailPrefix, TQueryOptions queryOptions, TExplainLevel detailLevel, StringBuilder output) {
        output.append(String.format("%sDATASTREAM SINK [FRAGMENT=%s, EXCHANGE=%s, %s]", prefix, this.exchNode_.getFragment().getId().toString(), this.exchNode_.getId().toString(), this.exchNode_.getDisplayLabelDetail()));
        output.append("\n");
    }

    @Override
    protected String getLabel() {
        return "EXCHANGE SENDER";
    }

    private long estimateOutboundRowBatchBuffers(TQueryOptions queryOptions) {
        long beBufferBytes = BackendConfig.INSTANCE.dataStreamSenderBufferSizeUsedByPlanner();
        long fixedLenRowSize = this.exchNode_.getFixedLenRowSize();
        if (fixedLenRowSize == 0L) {
            fixedLenRowSize = 1L;
        }
        long beRowsPerBuffer = Math.max(1L, (long)Math.ceil(beBufferBytes / fixedLenRowSize));
        boolean useBeLogic = beBufferBytes > 0L && this.outputPartition_.isPartitioned();
        int numChannels = this.outputPartition_.isPartitioned() ? this.exchNode_.getFragment().getNumInstances() : 1;
        long rowBatchSize = useBeLogic ? beRowsPerBuffer : PlanNode.getRowBatchSize(queryOptions);
        long avgOutboundRowBatchSize = Math.min((long)Math.ceil((double)rowBatchSize * ExchangeNode.getAvgSerializedRowSize(this.exchNode_)), 0x800000L);
        int outboundBatchesPerChannel = 2;
        int bufferPerOutboundBatch = 2;
        long bufferSize = (long)(numChannels * outboundBatchesPerChannel * bufferPerOutboundBatch) * avgOutboundRowBatchSize;
        return bufferSize;
    }

    @Override
    public void computeProcessingCost(TQueryOptions queryOptions) {
        String exchType;
        long outputCardinality = Math.max(0L, this.exchNode_.getFilteredCardinality());
        long outputSize = (long)(this.exchNode_.getAvgDeserializedRowSize() * (double)outputCardinality);
        double totalCost = 0.0;
        if (this.exchNode_.isMergingExchange()) {
            exchType = "MERGING";
            totalCost = (double)outputCardinality * 0.1047 + (double)outputSize * 0.0262;
        } else if (this.exchNode_.isBroadcastExchange()) {
            exchType = "BROADCAST";
            totalCost = (double)outputSize * 0.0027;
        } else {
            exchType = this.exchNode_.isDirectedExchange() ? "DIRECTED" : "PARTITIONED";
            totalCost = (double)outputSize * 0.0644;
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Total CPU cost estimate: " + totalCost + ", Exchange Type: " + exchType + ", Output Card: " + outputCardinality + ", Output Size: " + outputSize);
        }
        this.processingCost_ = ProcessingCost.basicCost(this.getLabel() + "(" + this.exchNode_.getDisplayLabel() + ")", totalCost);
    }

    @Override
    public void computeResourceProfile(TQueryOptions queryOptions) {
        long estimatedMem = this.estimateOutboundRowBatchBuffers(queryOptions);
        this.resourceProfile_ = ResourceProfile.noReservation(estimatedMem);
    }

    @Override
    protected void toThriftImpl(TDataSink tsink) {
        TDataStreamSink tStreamSink = new TDataStreamSink(this.exchNode_.getId().asInt(), this.outputPartition_.toThrift());
        tsink.setStream_sink(tStreamSink);
    }

    @Override
    protected TDataSinkType getSinkType() {
        return TDataSinkType.DATA_STREAM_SINK;
    }

    public DataPartition getOutputPartition() {
        return this.outputPartition_;
    }

    @Override
    public void collectExprs(List<Expr> exprs) {
        exprs.addAll(this.outputPartition_.getPartitionExprs());
    }
}

