/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.hive.mapreduce;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.RecordUpdater;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.hive.PhoenixSerializer;
import org.apache.phoenix.hive.mapreduce.PhoenixResultWritable;
import org.apache.phoenix.hive.util.PhoenixConnectionUtil;
import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
import org.apache.phoenix.hive.util.PhoenixUtil;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.schema.ConcurrentTableMutationException;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.util.QueryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PhoenixRecordWriter<T extends DBWritable>
implements RecordWriter<NullWritable, T>,
FileSinkOperator.RecordWriter,
RecordUpdater {
    private static final Logger LOG = LoggerFactory.getLogger(PhoenixRecordWriter.class);
    private Connection conn;
    private PreparedStatement pstmt;
    private long batchSize;
    private long numRecords = 0L;
    private Configuration config;
    private String tableName;
    private MetaDataClient metaDataClient;
    private boolean restoreWalMode;
    private long rowCountDelta = 0L;
    private PhoenixSerializer phoenixSerializer;
    private ObjectInspector objInspector;
    private PreparedStatement pstmtForDelete;

    public PhoenixRecordWriter(Path path, AcidOutputFormat.Options options) throws IOException {
        Configuration config = options.getConfiguration();
        Properties props = new Properties();
        try {
            this.initialize(config, props);
        }
        catch (SQLException e) {
            throw new IOException(e);
        }
        this.objInspector = options.getInspector();
        try {
            this.phoenixSerializer = new PhoenixSerializer(config, options.getTableProperties());
        }
        catch (SerDeException e) {
            throw new IOException(e);
        }
    }

    public PhoenixRecordWriter(Configuration configuration, Properties props) throws SQLException {
        this.initialize(configuration, props);
    }

    private void initialize(Configuration config, Properties properties) throws SQLException {
        this.config = config;
        this.tableName = config.get("phoenix.table.name");
        String walConfigName = this.tableName.toLowerCase() + ".disable.wal";
        boolean disableWal = config.getBoolean(walConfigName, false);
        if (disableWal) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Property " + walConfigName + " is true. batch.mode will be set true. ");
            }
            properties.setProperty("batch.mode", "true");
        }
        this.conn = PhoenixConnectionUtil.getInputConnection(config, properties);
        if (disableWal) {
            this.metaDataClient = new MetaDataClient((PhoenixConnection)this.conn);
            if (!PhoenixUtil.isDisabledWal(this.metaDataClient, this.tableName)) {
                block9: {
                    try {
                        PhoenixUtil.alterTableForWalDisable(this.conn, this.tableName, true);
                    }
                    catch (ConcurrentTableMutationException e) {
                        if (!LOG.isWarnEnabled()) break block9;
                        LOG.warn("Another mapper or task processing wal disable");
                    }
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug(this.tableName + "s wal disabled.");
                }
                this.restoreWalMode = true;
            }
        }
        this.batchSize = PhoenixConfigurationUtil.getBatchSize((Configuration)config);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Batch-size : " + this.batchSize);
        }
        String upsertQuery = QueryUtil.constructUpsertStatement((String)this.tableName, PhoenixUtil.getColumnInfoList(this.conn, this.tableName));
        if (LOG.isDebugEnabled()) {
            LOG.debug("Upsert-query : " + upsertQuery);
        }
        this.pstmt = this.conn.prepareStatement(upsertQuery);
    }

    public void write(NullWritable key, T record) throws IOException {
        try {
            record.write(this.pstmt);
            ++this.numRecords;
            this.pstmt.executeUpdate();
            if (this.numRecords % this.batchSize == 0L) {
                LOG.debug("Commit called on a batch of size : " + this.batchSize);
                this.conn.commit();
            }
        }
        catch (SQLException e) {
            throw new IOException("Exception while writing to table.", e);
        }
    }

    public void close(Reporter reporter) throws IOException {
        try {
            this.conn.commit();
            if (LOG.isInfoEnabled()) {
                LOG.info("Wrote row : " + this.numRecords);
            }
        }
        catch (SQLException e) {
            LOG.error("SQLException while performing the commit for the task.");
            throw new IOException(e);
        }
        finally {
            try {
                String autoFlushConfigName;
                boolean autoFlush;
                if (this.restoreWalMode && PhoenixUtil.isDisabledWal(this.metaDataClient, this.tableName)) {
                    block19: {
                        try {
                            PhoenixUtil.alterTableForWalDisable(this.conn, this.tableName, false);
                        }
                        catch (ConcurrentTableMutationException e) {
                            if (!LOG.isWarnEnabled()) break block19;
                            LOG.warn("Another mapper or task processing wal enable");
                        }
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(this.tableName + "s wal enabled.");
                    }
                }
                if (autoFlush = this.config.getBoolean(autoFlushConfigName = this.tableName.toLowerCase() + ".auto.flush", false)) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("autoFlush is true.");
                    }
                    PhoenixUtil.flush(this.conn, this.tableName);
                }
                PhoenixUtil.closeResource(this.pstmt);
                PhoenixUtil.closeResource(this.pstmtForDelete);
                PhoenixUtil.closeResource(this.conn);
            }
            catch (SQLException ex) {
                LOG.error("SQLException while closing the connection for the task.");
                throw new IOException(ex);
            }
        }
    }

    public boolean isRestoreWalMode() {
        return this.restoreWalMode;
    }

    public void write(Writable w) throws IOException {
        PhoenixResultWritable row = (PhoenixResultWritable)w;
        this.write(NullWritable.get(), (T)row);
    }

    public void close(boolean abort) throws IOException {
        this.close(Reporter.NULL);
    }

    public void insert(long currentTransaction, Object row) throws IOException {
        if (LOG.isTraceEnabled()) {
            LOG.trace("insert transaction : " + currentTransaction + ", row : " + PhoenixStorageHandlerUtil.toString(row));
        }
        PhoenixResultWritable pResultWritable = (PhoenixResultWritable)this.phoenixSerializer.serialize(row, this.objInspector, PhoenixSerializer.DmlType.INSERT);
        if (LOG.isTraceEnabled()) {
            LOG.trace("Data : " + pResultWritable.getValueList());
        }
        this.write(pResultWritable);
        ++this.rowCountDelta;
    }

    public void update(long currentTransaction, Object row) throws IOException {
        if (LOG.isTraceEnabled()) {
            LOG.trace("update transaction : " + currentTransaction + ", row : " + PhoenixStorageHandlerUtil.toString(row));
        }
        PhoenixResultWritable pResultWritable = (PhoenixResultWritable)this.phoenixSerializer.serialize(row, this.objInspector, PhoenixSerializer.DmlType.UPDATE);
        if (LOG.isTraceEnabled()) {
            LOG.trace("Data : " + pResultWritable.getValueList());
        }
        this.write(pResultWritable);
    }

    public void delete(long currentTransaction, Object row) throws IOException {
        if (LOG.isTraceEnabled()) {
            LOG.trace("delete transaction : " + currentTransaction + ", row : " + PhoenixStorageHandlerUtil.toString(row));
        }
        PhoenixResultWritable pResultWritable = (PhoenixResultWritable)this.phoenixSerializer.serialize(row, this.objInspector, PhoenixSerializer.DmlType.DELETE);
        if (LOG.isTraceEnabled()) {
            LOG.trace("Data : " + pResultWritable.getValueList());
        }
        if (this.pstmtForDelete == null) {
            try {
                String deleteQuery = PhoenixUtil.constructDeleteStatement(this.conn, this.tableName);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Delete query : " + deleteQuery);
                }
                this.pstmtForDelete = this.conn.prepareStatement(deleteQuery);
            }
            catch (SQLException e) {
                throw new IOException(e);
            }
        }
        this.delete(pResultWritable);
        --this.rowCountDelta;
    }

    private void delete(PhoenixResultWritable pResultWritable) throws IOException {
        try {
            pResultWritable.delete(this.pstmtForDelete);
            ++this.numRecords;
            this.pstmtForDelete.executeUpdate();
            if (this.numRecords % this.batchSize == 0L) {
                LOG.debug("Commit called on a batch of size : " + this.batchSize);
                this.conn.commit();
            }
        }
        catch (SQLException e) {
            throw new IOException("Exception while deleting to table.", e);
        }
    }

    public void flush() throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Flush called");
        }
        try {
            this.conn.commit();
            if (LOG.isInfoEnabled()) {
                LOG.info("Written row : " + this.numRecords);
            }
        }
        catch (SQLException e) {
            LOG.error("SQLException while performing the commit for the task.");
            throw new IOException(e);
        }
    }

    public SerDeStats getStats() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("getStats called");
        }
        SerDeStats stats = new SerDeStats();
        stats.setRowCount(this.rowCountDelta);
        return stats;
    }

    public long getBufferedRowCount() {
        return this.numRecords;
    }
}

