/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.mapreduce.index;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.PhoenixJobCounters;
import org.apache.phoenix.mapreduce.index.DirectHTableWriter;
import org.apache.phoenix.mapreduce.index.PhoenixIndexDBWritable;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PhoenixIndexImportDirectMapper
extends Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, IntWritable> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixIndexImportDirectMapper.class);
    private final PhoenixIndexDBWritable indxWritable = new PhoenixIndexDBWritable();
    private List<ColumnInfo> indxTblColumnMetadata;
    private Connection connection;
    private PreparedStatement pStatement;
    private DirectHTableWriter writer;
    private int batchSize;
    private long batchSizeBytes;
    private MutationState mutationState;
    private int currentBatchCount = 0;
    private IndexUtil.IndexStatusUpdater indexStatusUpdater;

    protected void setup(Mapper.Context context) throws IOException, InterruptedException {
        super.setup(context);
        Configuration configuration = context.getConfiguration();
        this.writer = new DirectHTableWriter(configuration);
        try {
            this.indxTblColumnMetadata = PhoenixConfigurationUtil.getUpsertColumnMetadataList(configuration);
            this.indxWritable.setColumnMetadata(this.indxTblColumnMetadata);
            Properties overrideProps = new Properties();
            String scn = configuration.get("phoenix.mr.currentscn.value");
            String txScnValue = configuration.get("phoenix.mr.txscn.value");
            if (txScnValue == null && scn != null) {
                overrideProps.put("BuildIndexAt", scn);
            }
            this.connection = ConnectionUtil.getOutputConnection((Configuration)configuration, (Properties)overrideProps);
            this.connection.setAutoCommit(false);
            ConnectionQueryServices services = ((PhoenixConnection)this.connection).getQueryServices();
            int maxSize = services.getProps().getInt("phoenix.mutate.maxSize", 500000);
            this.batchSize = Math.min(((PhoenixConnection)this.connection).getMutateBatchSize(), maxSize);
            this.batchSizeBytes = ((PhoenixConnection)this.connection).getMutateBatchSizeBytes();
            LOGGER.info("Mutation Batch Size = " + this.batchSize);
            String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
            this.pStatement = this.connection.prepareStatement(upsertQuery);
            String indexTableName = PhoenixConfigurationUtil.getIndexToolIndexTableName(configuration);
            PTable pIndexTable = this.connection.unwrap(PhoenixConnection.class).getTable(indexTableName);
            this.indexStatusUpdater = new IndexUtil.IndexStatusUpdater(SchemaUtil.getEmptyColumnFamily((PTable)pIndexTable), (byte[])EncodedColumnsUtil.getEmptyKeyValueInfo((PTable)pIndexTable).getFirst());
        }
        catch (Exception e) {
            this.tryClosingResources();
            throw new RuntimeException(e);
        }
    }

    protected void map(NullWritable key, PhoenixIndexDBWritable record, Mapper.Context context) throws IOException, InterruptedException {
        try {
            ++this.currentBatchCount;
            List<Object> values = record.getValues();
            this.indxWritable.setValues(values);
            this.indxWritable.write(this.pStatement);
            this.pStatement.execute();
            PhoenixConnection pconn = this.connection.unwrap(PhoenixConnection.class);
            MutationState currentMutationState = pconn.getMutationState();
            if (this.mutationState == null) {
                this.mutationState = currentMutationState;
            }
            this.mutationState.join(currentMutationState);
            if (this.currentBatchCount % this.batchSize == 0) {
                this.writeBatch(this.mutationState, context);
                this.mutationState = null;
            }
            context.progress();
        }
        catch (SQLException e) {
            LOGGER.error(" Error {}  while read/write of a record ", (Object)e.getMessage());
            context.getCounter((Enum)PhoenixJobCounters.FAILED_RECORDS).increment((long)this.currentBatchCount);
            throw new RuntimeException(e);
        }
        context.getCounter((Enum)PhoenixJobCounters.INPUT_RECORDS).increment(1L);
    }

    private void writeBatch(MutationState mutationState, Mapper.Context context) throws IOException, SQLException, InterruptedException {
        Iterator iterator = mutationState.toMutations(true, null);
        while (iterator.hasNext()) {
            Pair mutationPair = (Pair)iterator.next();
            List batchMutations = (List)mutationPair.getSecond();
            List batchOfBatchMutations = MutationState.getMutationBatchList((long)this.batchSize, (long)this.batchSizeBytes, (List)batchMutations);
            for (List mutationList : batchOfBatchMutations) {
                for (Mutation mutation : mutationList) {
                    this.indexStatusUpdater.setVerified(mutation.cellScanner());
                }
                this.writer.write(mutationList);
            }
            context.getCounter((Enum)PhoenixJobCounters.OUTPUT_RECORDS).increment((long)((List)mutationPair.getSecond()).size());
        }
        this.connection.rollback();
        this.currentBatchCount = 0;
    }

    protected void cleanup(Mapper.Context context) throws IOException, InterruptedException {
        try {
            if (this.mutationState != null) {
                this.writeBatch(this.mutationState, context);
            }
            context.write((Object)new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)), (Object)new IntWritable(0));
            super.cleanup(context);
        }
        catch (SQLException e) {
            LOGGER.error(" Error {}  while read/write of a record ", (Object)e.getMessage());
            context.getCounter((Enum)PhoenixJobCounters.FAILED_RECORDS).increment((long)this.currentBatchCount);
            throw new RuntimeException(e);
        }
        finally {
            this.tryClosingResources();
        }
    }

    private void tryClosingResources() throws IOException {
        if (this.connection != null) {
            try {
                this.connection.close();
            }
            catch (SQLException e) {
                LOGGER.error("Error while closing connection in the PhoenixIndexMapper class ", (Throwable)e);
            }
        }
        if (this.writer != null) {
            this.writer.close();
        }
    }
}

