/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.streaming;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.LockComponentBuilder;
import org.apache.hadoop.hive.metastore.LockRequestBuilder;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.hive.streaming.ConnectionError;
import org.apache.hive.streaming.ConnectionStats;
import org.apache.hive.streaming.InvalidTable;
import org.apache.hive.streaming.InvalidTransactionState;
import org.apache.hive.streaming.PartitionInfo;
import org.apache.hive.streaming.RecordWriter;
import org.apache.hive.streaming.SerializationError;
import org.apache.hive.streaming.StreamingConnection;
import org.apache.hive.streaming.StreamingException;
import org.apache.hive.streaming.TransactionError;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveStreamingConnection
implements StreamingConnection {
    private static final Logger LOG = LoggerFactory.getLogger((String)HiveStreamingConnection.class.getName());
    private static final String DEFAULT_METASTORE_URI = "thrift://localhost:9083";
    private static final int DEFAULT_TRANSACTION_BATCH_SIZE = 1;
    private static final int DEFAULT_HEARTBEAT_INTERVAL = 60000;
    private static final boolean DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED = true;
    private String database;
    private String table;
    private List<String> staticPartitionValues;
    private String agentInfo;
    private int transactionBatchSize;
    private RecordWriter recordWriter;
    private TransactionBatch currentTransactionBatch;
    private HiveConf conf;
    private boolean streamingOptimizations;
    private AtomicBoolean isConnectionClosed = new AtomicBoolean(false);
    private boolean isPartitionedTable;
    private IMetaStoreClient msClient;
    private IMetaStoreClient heartbeatMSClient;
    private final String username;
    private final boolean secureMode;
    private Table tableObject = null;
    private String metastoreUri;
    private ConnectionStats connectionStats;

    private HiveStreamingConnection(Builder builder) throws StreamingException {
        this.database = builder.database.toLowerCase();
        this.table = builder.table.toLowerCase();
        this.staticPartitionValues = builder.staticPartitionValues;
        this.conf = builder.hiveConf;
        this.agentInfo = builder.agentInfo;
        this.streamingOptimizations = builder.streamingOptimizations;
        UserGroupInformation loggedInUser = null;
        try {
            loggedInUser = UserGroupInformation.getLoginUser();
        }
        catch (IOException e) {
            LOG.warn("Unable to get logged in user via UGI. err: {}", (Object)e.getMessage());
        }
        if (loggedInUser == null) {
            this.username = System.getProperty("user.name");
            this.secureMode = false;
        } else {
            this.username = loggedInUser.getShortUserName();
            this.secureMode = loggedInUser.hasKerberosCredentials();
        }
        this.transactionBatchSize = builder.transactionBatchSize;
        this.recordWriter = builder.recordWriter;
        this.connectionStats = new ConnectionStats();
        if (this.agentInfo == null) {
            try {
                this.agentInfo = this.username + ":" + InetAddress.getLocalHost().getHostName() + ":" + Thread.currentThread().getName();
            }
            catch (UnknownHostException e) {
                this.agentInfo = UUID.randomUUID().toString();
            }
        }
        if (this.conf == null) {
            this.conf = this.createHiveConf(this.getClass(), DEFAULT_METASTORE_URI);
        }
        this.overrideConfSettings(this.conf);
        this.metastoreUri = this.conf.get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName());
        this.msClient = HiveStreamingConnection.getMetaStoreClient(this.conf, this.metastoreUri, this.secureMode, "streaming-connection");
        this.heartbeatMSClient = HiveStreamingConnection.getMetaStoreClient(this.conf, this.metastoreUri, this.secureMode, "streaming-connection-heartbeat");
        this.validateTable();
        LOG.info("STREAMING CONNECTION INFO: {}", (Object)this.toConnectionInfoString());
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    private void setPartitionedTable(boolean isPartitionedTable) {
        this.isPartitionedTable = isPartitionedTable;
    }

    public String toString() {
        return "{ metaStoreUri: " + this.metastoreUri + ", database: " + this.database + ", table: " + this.table + " }";
    }

    private String toConnectionInfoString() {
        return "{ metastore-uri: " + this.metastoreUri + ", database: " + this.database + ", table: " + this.table + ", partitioned-table: " + this.isPartitionedTable() + ", dynamic-partitioning: " + this.isDynamicPartitioning() + ", username: " + this.username + ", secure-mode: " + this.secureMode + ", record-writer: " + this.recordWriter.getClass().getSimpleName() + ", agent-info: " + this.agentInfo + " }";
    }

    @VisibleForTesting
    String toTransactionString() {
        return this.currentTransactionBatch == null ? "" : this.currentTransactionBatch.toString();
    }

    @Override
    public PartitionInfo createPartitionIfNotExists(List<String> partitionValues) throws StreamingException {
        String partLocation = null;
        String partName = null;
        boolean exists = false;
        try {
            Map partSpec = Warehouse.makeSpecFromValues((List)this.tableObject.getPartitionKeys(), partitionValues);
            AddPartitionDesc addPartitionDesc = new AddPartitionDesc(this.database, this.table, true);
            partName = Warehouse.makePartName((List)this.tableObject.getPartitionKeys(), partitionValues);
            partLocation = new Path(this.tableObject.getDataLocation(), Warehouse.makePartPath((Map)partSpec)).toString();
            addPartitionDesc.addPartition(partSpec, partLocation);
            Partition partition = Hive.convertAddSpecToMetaPartition((Table)this.tableObject, (AddPartitionDesc.OnePartitionDesc)addPartitionDesc.getPartition(0), (HiveConf)this.conf);
            this.getMSC().add_partition(partition);
        }
        catch (AlreadyExistsException e) {
            exists = true;
        }
        catch (HiveException | TException e) {
            throw new StreamingException("Unable to creation partition for values: " + partitionValues + " connection: " + this.toConnectionInfoString(), e);
        }
        return new PartitionInfo(partName, partLocation, exists);
    }

    IMetaStoreClient getMSC() {
        this.connectionStats.incrementMetastoreCalls();
        return this.msClient;
    }

    IMetaStoreClient getHeatbeatMSC() {
        this.connectionStats.incrementMetastoreCalls();
        return this.heartbeatMSClient;
    }

    private void validateTable() throws InvalidTable, ConnectionError {
        try {
            this.tableObject = new Table(this.getMSC().getTable(this.database, this.table));
        }
        catch (Exception e) {
            LOG.warn("Unable to validate the table for connection: " + this.toConnectionInfoString(), (Throwable)e);
            throw new InvalidTable(this.database, this.table, e);
        }
        if (!AcidUtils.isFullAcidTable((Table)this.tableObject)) {
            LOG.error("HiveEndPoint " + this + " must use an acid table");
            throw new InvalidTable(this.database, this.table, "is not an Acid table");
        }
        if (this.tableObject.getPartitionKeys() != null && !this.tableObject.getPartitionKeys().isEmpty()) {
            this.setPartitionedTable(true);
        } else {
            this.setPartitionedTable(false);
        }
        if (!this.isPartitionedTable() && this.staticPartitionValues != null && !this.staticPartitionValues.isEmpty()) {
            String errMsg = this.toString() + " specifies partitions for un-partitioned table";
            LOG.error(errMsg);
            throw new ConnectionError(errMsg);
        }
    }

    private void beginNextTransaction() throws StreamingException {
        if (this.currentTransactionBatch == null) {
            this.currentTransactionBatch = this.createNewTransactionBatch();
            LOG.info("Opened new transaction batch {}", (Object)this.currentTransactionBatch);
        }
        if (this.currentTransactionBatch.isClosed()) {
            throw new StreamingException("Cannot begin next transaction on a closed streaming connection");
        }
        if (this.currentTransactionBatch.remainingTransactions() == 0) {
            LOG.info("Transaction batch {} is done. Rolling over to next transaction batch.", (Object)this.currentTransactionBatch);
            this.currentTransactionBatch.close();
            this.currentTransactionBatch = this.createNewTransactionBatch();
            LOG.info("Rolled over to new transaction batch {}", (Object)this.currentTransactionBatch);
        }
        this.currentTransactionBatch.beginNextTransaction();
    }

    private TransactionBatch createNewTransactionBatch() throws StreamingException {
        return new TransactionBatch(this);
    }

    private void checkClosedState() throws StreamingException {
        if (this.isConnectionClosed.get()) {
            throw new StreamingException("Streaming connection is closed already.");
        }
    }

    private void checkState() throws StreamingException {
        this.checkClosedState();
        if (this.currentTransactionBatch == null) {
            throw new StreamingException("Transaction batch is null. Missing beginTransaction?");
        }
        if (this.currentTransactionBatch.state != TxnState.OPEN) {
            throw new StreamingException("Transaction state is not OPEN. Missing beginTransaction?");
        }
    }

    @Override
    public void beginTransaction() throws StreamingException {
        this.checkClosedState();
        this.beginNextTransaction();
    }

    @Override
    public void commitTransaction() throws StreamingException {
        this.checkState();
        this.currentTransactionBatch.commit();
        this.connectionStats.incrementCommittedTransactions();
    }

    @Override
    public void abortTransaction() throws StreamingException {
        this.checkState();
        this.currentTransactionBatch.abort();
        this.connectionStats.incrementAbortedTransactions();
    }

    @Override
    public void write(byte[] record) throws StreamingException {
        this.checkState();
        this.currentTransactionBatch.write(record);
    }

    @Override
    public void write(InputStream inputStream) throws StreamingException {
        this.checkState();
        this.currentTransactionBatch.write(inputStream);
    }

    @Override
    public void close() {
        if (this.isConnectionClosed.get()) {
            return;
        }
        this.isConnectionClosed.set(true);
        try {
            if (this.currentTransactionBatch != null) {
                this.currentTransactionBatch.close();
            }
        }
        catch (StreamingException e) {
            LOG.warn("Unable to close current transaction batch: " + this.currentTransactionBatch, (Throwable)e);
        }
        finally {
            this.getMSC().close();
            this.getHeatbeatMSC().close();
        }
        LOG.info("Closed streaming connection. Agent: {} Stats: {}", (Object)this.getAgentInfo(), (Object)this.getConnectionStats());
    }

    @Override
    public ConnectionStats getConnectionStats() {
        return this.connectionStats;
    }

    private static IMetaStoreClient getMetaStoreClient(HiveConf conf, String metastoreUri, boolean secureMode, String owner) throws ConnectionError {
        if (metastoreUri != null) {
            conf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreUri);
        }
        if (secureMode) {
            conf.setBoolean(MetastoreConf.ConfVars.USE_THRIFT_SASL.getHiveName(), true);
        }
        try {
            LOG.info("Creating metastore client for {}", (Object)owner);
            return HiveMetaStoreUtils.getHiveMetastoreClient((HiveConf)conf);
        }
        catch (IOException | MetaException e) {
            throw new ConnectionError("Error connecting to Hive Metastore URI: " + metastoreUri + ". " + e.getMessage(), (Exception)e);
        }
    }

    @VisibleForTesting
    TxnState getCurrentTransactionState() {
        return this.currentTransactionBatch.getCurrentTransactionState();
    }

    @VisibleForTesting
    int remainingTransactions() {
        return this.currentTransactionBatch.remainingTransactions();
    }

    @VisibleForTesting
    Long getCurrentTxnId() {
        return this.currentTransactionBatch.getCurrentTxnId();
    }

    private HiveConf createHiveConf(Class<?> clazz, String metaStoreUri) {
        HiveConf conf = new HiveConf(clazz);
        if (metaStoreUri != null) {
            conf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metaStoreUri);
        }
        return conf;
    }

    private void overrideConfSettings(HiveConf conf) {
        HiveStreamingConnection.setHiveConf(conf, HiveConf.ConfVars.HIVE_TXN_MANAGER, DbTxnManager.class.getName());
        HiveStreamingConnection.setHiveConf(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
        HiveStreamingConnection.setHiveConf(conf, MetastoreConf.ConfVars.EXECUTE_SET_UGI.getHiveName());
        HiveStreamingConnection.setHiveConf(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
        HiveStreamingConnection.setHiveConf(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
        if (this.streamingOptimizations) {
            HiveStreamingConnection.setHiveConf(conf, HiveConf.ConfVars.HIVE_ORC_DELTA_STREAMING_OPTIMIZATIONS_ENABLED, true);
        }
        HiveStreamingConnection.setHiveConf(conf, HiveConf.ConfVars.METASTORE_CLIENT_CACHE_ENABLED, false);
    }

    private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, String value) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
        }
        conf.setVar(var, value);
    }

    private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, boolean value) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
        }
        conf.setBoolVar(var, true);
    }

    private static void setHiveConf(HiveConf conf, String var) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Overriding HiveConf setting : " + var + " = " + true);
        }
        conf.setBoolean(var, true);
    }

    @Override
    public HiveConf getHiveConf() {
        return this.conf;
    }

    @Override
    public String getMetastoreUri() {
        return this.metastoreUri;
    }

    @Override
    public Table getTable() {
        return this.tableObject;
    }

    @Override
    public List<String> getStaticPartitionValues() {
        return this.staticPartitionValues;
    }

    @Override
    public String getAgentInfo() {
        return this.agentInfo;
    }

    @Override
    public boolean isPartitionedTable() {
        return this.isPartitionedTable;
    }

    @Override
    public boolean isDynamicPartitioning() {
        return this.isPartitionedTable() && (this.staticPartitionValues == null || this.staticPartitionValues.isEmpty());
    }

    private static class TransactionBatch {
        private String username;
        private HiveStreamingConnection conn;
        private ScheduledExecutorService scheduledExecutorService;
        private RecordWriter recordWriter;
        private String partNameForLock = null;
        private List<TxnToWriteId> txnToWriteIds;
        private int currentTxnIndex = -1;
        private TxnState state;
        private LockRequest lockRequest = null;
        private final ReentrantLock transactionLock = new ReentrantLock();
        private final AtomicLong minTxnId;
        private final long maxTxnId;
        private final AtomicBoolean isTxnClosed = new AtomicBoolean(false);
        private String agentInfo;
        private int numTxns;
        private TxnState[] txnStatus;
        private long lastTxnUsed;

        private TransactionBatch(HiveStreamingConnection conn) throws StreamingException {
            boolean success = false;
            try {
                if (conn.isPartitionedTable() && !conn.isDynamicPartitioning()) {
                    List partKeys = conn.tableObject.getPartitionKeys();
                    this.partNameForLock = Warehouse.makePartName((List)partKeys, (List)conn.staticPartitionValues);
                }
                this.conn = conn;
                this.username = conn.username;
                this.recordWriter = conn.recordWriter;
                this.agentInfo = conn.agentInfo;
                this.numTxns = conn.transactionBatchSize;
                this.setupHeartBeatThread();
                List<Long> txnIds = this.openTxnImpl(this.username, this.numTxns);
                this.txnToWriteIds = this.allocateWriteIdsImpl(txnIds);
                assert (this.txnToWriteIds.size() == this.numTxns);
                this.txnStatus = new TxnState[this.numTxns];
                for (int i = 0; i < this.txnStatus.length; ++i) {
                    assert (this.txnToWriteIds.get(i).getTxnId() == txnIds.get(i).longValue());
                    this.txnStatus[i] = TxnState.OPEN;
                }
                this.state = TxnState.INACTIVE;
                this.recordWriter.init(conn, this.txnToWriteIds.get(0).getWriteId(), this.txnToWriteIds.get(this.numTxns - 1).getWriteId());
                this.minTxnId = new AtomicLong(txnIds.get(0));
                this.maxTxnId = txnIds.get(txnIds.size() - 1);
                success = true;
            }
            catch (TException e) {
                throw new StreamingException(conn.toString(), e);
            }
            finally {
                this.markDead(success);
            }
        }

        private void setupHeartBeatThread() {
            long heartBeatInterval;
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HiveStreamingConnection-Heartbeat-Thread").build();
            this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
            try {
                heartBeatInterval = DbTxnManager.getHeartbeatInterval((Configuration)this.conn.conf);
            }
            catch (LockException e) {
                heartBeatInterval = 60000L;
            }
            long initialDelay = (long)((double)heartBeatInterval * 0.75 * Math.random());
            LOG.info("Starting heartbeat thread with interval: {} ms initialDelay: {} ms for agentInfo: {}", new Object[]{heartBeatInterval, initialDelay, this.conn.agentInfo});
            HeartbeatRunnable runnable = new HeartbeatRunnable(this.conn, this.minTxnId, this.maxTxnId, this.transactionLock, this.isTxnClosed);
            this.scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, heartBeatInterval, TimeUnit.MILLISECONDS);
        }

        private List<Long> openTxnImpl(String user, int numTxns) throws TException {
            return this.conn.getMSC().openTxns(user, numTxns).getTxn_ids();
        }

        private List<TxnToWriteId> allocateWriteIdsImpl(List<Long> txnIds) throws TException {
            return this.conn.getMSC().allocateTableWriteIdsBatch(txnIds, this.conn.database, this.conn.table);
        }

        public String toString() {
            if (this.txnToWriteIds == null || this.txnToWriteIds.isEmpty()) {
                return "{}";
            }
            StringBuilder sb = new StringBuilder(" TxnStatus[");
            for (TxnState state : this.txnStatus) {
                sb.append(state == null ? "N" : state);
            }
            sb.append("] LastUsed ").append(JavaUtils.txnIdToString((long)this.lastTxnUsed));
            return "TxnId/WriteIds=[" + this.txnToWriteIds.get(0).getTxnId() + "/" + this.txnToWriteIds.get(0).getWriteId() + "..." + this.txnToWriteIds.get(this.txnToWriteIds.size() - 1).getTxnId() + "/" + this.txnToWriteIds.get(this.txnToWriteIds.size() - 1).getWriteId() + "] on connection = " + this.conn + "; " + sb;
        }

        private void beginNextTransaction() throws StreamingException {
            this.checkIsClosed();
            this.beginNextTransactionImpl();
        }

        private void beginNextTransactionImpl() throws TransactionError {
            this.state = TxnState.INACTIVE;
            if (this.currentTxnIndex + 1 >= this.txnToWriteIds.size()) {
                throw new InvalidTransactionState("No more transactions available in next batch for connection: " + this.conn + " user: " + this.username);
            }
            ++this.currentTxnIndex;
            this.state = TxnState.OPEN;
            this.lastTxnUsed = this.getCurrentTxnId();
            this.lockRequest = TransactionBatch.createLockRequest(this.conn, this.partNameForLock, this.username, this.getCurrentTxnId(), this.agentInfo);
            try {
                LockResponse res = this.conn.getMSC().lock(this.lockRequest);
                if (res.getState() != LockState.ACQUIRED) {
                    throw new TransactionError("Unable to acquire lock on " + this.conn);
                }
            }
            catch (TException e) {
                throw new TransactionError("Unable to acquire lock on " + this.conn, (Exception)((Object)e));
            }
        }

        long getCurrentTxnId() {
            if (this.currentTxnIndex >= 0) {
                return this.txnToWriteIds.get(this.currentTxnIndex).getTxnId();
            }
            return -1L;
        }

        long getCurrentWriteId() {
            if (this.currentTxnIndex >= 0) {
                return this.txnToWriteIds.get(this.currentTxnIndex).getWriteId();
            }
            return -1L;
        }

        TxnState getCurrentTransactionState() {
            return this.state;
        }

        int remainingTransactions() {
            if (this.currentTxnIndex >= 0) {
                return this.txnToWriteIds.size() - this.currentTxnIndex - 1;
            }
            return this.txnToWriteIds.size();
        }

        public void write(byte[] record) throws StreamingException {
            this.checkIsClosed();
            boolean success = false;
            try {
                this.recordWriter.write(this.getCurrentWriteId(), record);
                success = true;
            }
            catch (SerializationError ex) {
                success = true;
                throw ex;
            }
            finally {
                this.markDead(success);
            }
        }

        public void write(InputStream inputStream) throws StreamingException {
            this.checkIsClosed();
            boolean success = false;
            try {
                this.recordWriter.write(this.getCurrentWriteId(), inputStream);
                success = true;
            }
            catch (SerializationError ex) {
                success = true;
                throw ex;
            }
            finally {
                this.markDead(success);
            }
        }

        private void checkIsClosed() throws StreamingException {
            if (this.isTxnClosed.get()) {
                throw new StreamingException("Transaction" + this.toString() + " is closed()");
            }
        }

        private void markDead(boolean success) throws StreamingException {
            if (success) {
                return;
            }
            this.close();
        }

        void commit() throws StreamingException {
            this.checkIsClosed();
            boolean success = false;
            try {
                this.commitImpl();
                success = true;
            }
            finally {
                this.markDead(success);
            }
        }

        private void commitImpl() throws StreamingException {
            try {
                this.recordWriter.flush();
                TxnToWriteId txnToWriteId = this.txnToWriteIds.get(this.currentTxnIndex);
                if (this.conn.isDynamicPartitioning()) {
                    ArrayList<String> partNames = new ArrayList<String>(this.recordWriter.getPartitions());
                    this.conn.getMSC().addDynamicPartitions(txnToWriteId.getTxnId(), txnToWriteId.getWriteId(), this.conn.database, this.conn.table, partNames, DataOperationType.INSERT);
                }
                this.transactionLock.lock();
                try {
                    this.conn.getMSC().commitTxn(txnToWriteId.getTxnId());
                    if (this.currentTxnIndex + 1 < this.txnToWriteIds.size()) {
                        this.minTxnId.set(this.txnToWriteIds.get(this.currentTxnIndex + 1).getTxnId());
                    } else {
                        this.minTxnId.set(-1L);
                    }
                }
                finally {
                    this.transactionLock.unlock();
                }
                this.state = TxnState.COMMITTED;
                this.txnStatus[this.currentTxnIndex] = TxnState.COMMITTED;
            }
            catch (NoSuchTxnException e) {
                throw new TransactionError("Invalid transaction id : " + this.getCurrentTxnId(), (Exception)((Object)e));
            }
            catch (TxnAbortedException e) {
                throw new TransactionError("Aborted transaction cannot be committed", (Exception)((Object)e));
            }
            catch (TException e) {
                throw new TransactionError("Unable to commitTransaction transaction" + this.getCurrentTxnId(), (Exception)((Object)e));
            }
        }

        void abort() throws StreamingException {
            if (this.isTxnClosed.get()) {
                return;
            }
            this.abort(false);
        }

        private void abort(boolean abortAllRemaining) throws StreamingException {
            this.abortImpl(abortAllRemaining);
        }

        private void abortImpl(boolean abortAllRemaining) throws StreamingException {
            if (this.minTxnId == null) {
                return;
            }
            this.transactionLock.lock();
            try {
                if (abortAllRemaining) {
                    int minOpenTxnIndex;
                    this.minTxnId.set(-1L);
                    this.currentTxnIndex = minOpenTxnIndex = Math.max(this.currentTxnIndex + (this.state == TxnState.ABORTED || this.state == TxnState.COMMITTED ? 1 : 0), 0);
                    while (this.currentTxnIndex < this.txnToWriteIds.size()) {
                        this.conn.getMSC().rollbackTxn(this.txnToWriteIds.get(this.currentTxnIndex).getTxnId());
                        this.txnStatus[this.currentTxnIndex] = TxnState.ABORTED;
                        ++this.currentTxnIndex;
                    }
                    --this.currentTxnIndex;
                } else {
                    if (this.currentTxnIndex + 1 < this.txnToWriteIds.size()) {
                        this.minTxnId.set(this.txnToWriteIds.get(this.currentTxnIndex + 1).getTxnId());
                    } else {
                        this.minTxnId.set(-1L);
                    }
                    long currTxnId = this.getCurrentTxnId();
                    if (currTxnId > 0L) {
                        this.conn.getMSC().rollbackTxn(currTxnId);
                        this.txnStatus[this.currentTxnIndex] = TxnState.ABORTED;
                    }
                }
                this.state = TxnState.ABORTED;
            }
            catch (NoSuchTxnException e) {
                throw new TransactionError("Unable to abort invalid transaction id : " + this.getCurrentTxnId(), (Exception)((Object)e));
            }
            catch (TException e) {
                throw new TransactionError("Unable to abort transaction id : " + this.getCurrentTxnId(), (Exception)((Object)e));
            }
            finally {
                this.transactionLock.unlock();
            }
        }

        public boolean isClosed() {
            return this.isTxnClosed.get();
        }

        public void close() throws StreamingException {
            if (this.isTxnClosed.get()) {
                return;
            }
            this.isTxnClosed.set(true);
            try {
                this.abort(true);
            }
            catch (Exception ex) {
                LOG.error("Fatal error on " + this.toString() + "; cause " + ex.getMessage(), (Throwable)ex);
                throw new StreamingException("Unable to abort", ex);
            }
            try {
                this.closeImpl();
            }
            catch (Exception ex) {
                LOG.error("Fatal error on " + this.toString() + "; cause " + ex.getMessage(), (Throwable)ex);
                throw new StreamingException("Unable to close", ex);
            }
        }

        private void closeImpl() throws StreamingException {
            this.state = TxnState.INACTIVE;
            this.recordWriter.close();
            if (this.scheduledExecutorService != null) {
                this.scheduledExecutorService.shutdownNow();
            }
        }

        static LockRequest createLockRequest(HiveStreamingConnection connection, String partNameForLock, String user, long txnId, String agentInfo) {
            LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
            requestBuilder.setUser(user);
            requestBuilder.setTransactionId(txnId);
            LockComponentBuilder lockCompBuilder = new LockComponentBuilder().setDbName(connection.database).setTableName(connection.table).setShared().setOperationType(DataOperationType.INSERT);
            if (connection.isDynamicPartitioning()) {
                lockCompBuilder.setIsDynamicPartitionWrite(true);
            }
            if (partNameForLock != null && !partNameForLock.isEmpty()) {
                lockCompBuilder.setPartitionName(partNameForLock);
            }
            requestBuilder.addLockComponent(lockCompBuilder.build());
            return requestBuilder.build();
        }
    }

    private static class HeartbeatRunnable
    implements Runnable {
        private final HiveStreamingConnection conn;
        private final AtomicLong minTxnId;
        private final long maxTxnId;
        private final ReentrantLock transactionLock;
        private final AtomicBoolean isTxnClosed;

        HeartbeatRunnable(HiveStreamingConnection conn, AtomicLong minTxnId, long maxTxnId, ReentrantLock transactionLock, AtomicBoolean isTxnClosed) {
            this.conn = conn;
            this.minTxnId = minTxnId;
            this.maxTxnId = maxTxnId;
            this.transactionLock = transactionLock;
            this.isTxnClosed = isTxnClosed;
        }

        @Override
        public void run() {
            this.transactionLock.lock();
            try {
                if (this.minTxnId.get() > 0L) {
                    HeartbeatTxnRangeResponse resp = this.conn.getHeatbeatMSC().heartbeatTxnRange(this.minTxnId.get(), this.maxTxnId);
                    if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
                        LOG.error("Heartbeat failure: {}", (Object)resp.toString());
                        this.isTxnClosed.set(true);
                    } else {
                        LOG.info("Heartbeat sent for range: [{}-{}]", (Object)this.minTxnId.get(), (Object)this.maxTxnId);
                    }
                }
            }
            catch (TException e) {
                LOG.warn("Failure to heartbeat for transaction range: [" + this.minTxnId.get() + "-" + this.maxTxnId + "]", (Throwable)e);
            }
            finally {
                this.transactionLock.unlock();
            }
        }
    }

    public static class Builder {
        private String database;
        private String table;
        private List<String> staticPartitionValues;
        private String agentInfo;
        private HiveConf hiveConf;
        private int transactionBatchSize = 1;
        private boolean streamingOptimizations = true;
        private RecordWriter recordWriter;

        public Builder withDatabase(String database) {
            this.database = database;
            return this;
        }

        public Builder withTable(String table) {
            this.table = table;
            return this;
        }

        public Builder withStaticPartitionValues(List<String> staticPartitionValues) {
            this.staticPartitionValues = staticPartitionValues == null ? null : new ArrayList<String>(staticPartitionValues);
            return this;
        }

        public Builder withAgentInfo(String agentInfo) {
            this.agentInfo = agentInfo;
            return this;
        }

        public Builder withHiveConf(HiveConf hiveConf) {
            this.hiveConf = hiveConf;
            return this;
        }

        @InterfaceStability.Evolving
        public Builder withTransactionBatchSize(int transactionBatchSize) {
            this.transactionBatchSize = transactionBatchSize;
            return this;
        }

        public Builder withStreamingOptimizations(boolean enable) {
            this.streamingOptimizations = enable;
            return this;
        }

        public Builder withRecordWriter(RecordWriter recordWriter) {
            this.recordWriter = recordWriter;
            return this;
        }

        public HiveStreamingConnection connect() throws StreamingException {
            if (this.database == null) {
                throw new StreamingException("Database cannot be null for streaming connection");
            }
            if (this.table == null) {
                throw new StreamingException("Table cannot be null for streaming connection");
            }
            if (this.recordWriter == null) {
                throw new StreamingException("Record writer cannot be null for streaming connection");
            }
            HiveStreamingConnection streamingConnection = new HiveStreamingConnection(this);
            ShutdownHookManager.addShutdownHook(streamingConnection::close, (int)11);
            Thread.setDefaultUncaughtExceptionHandler((t, e) -> streamingConnection.close());
            return streamingConnection;
        }
    }

    public static enum TxnState {
        INACTIVE("I"),
        OPEN("O"),
        COMMITTED("C"),
        ABORTED("A");

        private final String code;

        private TxnState(String code) {
            this.code = code;
        }

        public String toString() {
            return this.code;
        }
    }
}

