package org.apache.hive.streaming;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.LockComponentBuilder;
import org.apache.hadoop.hive.metastore.LockRequestBuilder;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hive/streaming/HiveStreamingConnection.class */
public class HiveStreamingConnection implements StreamingConnection {
    private static final Logger LOG = LoggerFactory.getLogger(HiveStreamingConnection.class.getName());
    private static final String DEFAULT_METASTORE_URI = "thrift://localhost:9083";
    private static final int DEFAULT_TRANSACTION_BATCH_SIZE = 1;
    private static final int DEFAULT_HEARTBEAT_INTERVAL = 60000;
    private static final boolean DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED = true;
    private String database;
    private String table;
    private List<String> staticPartitionValues;
    private String agentInfo;
    private int transactionBatchSize;
    private RecordWriter recordWriter;
    private TransactionBatch currentTransactionBatch;
    private HiveConf conf;
    private boolean streamingOptimizations;
    private AtomicBoolean isConnectionClosed;
    private boolean isPartitionedTable;
    private IMetaStoreClient msClient;
    private IMetaStoreClient heartbeatMSClient;
    private final String username;
    private final boolean secureMode;
    private Table tableObject;
    private String metastoreUri;
    private ConnectionStats connectionStats;

    /* loaded from: input_file:org/apache/hive/streaming/HiveStreamingConnection$Builder.class */
    public static class Builder {
        private String database;
        private String table;
        private List<String> staticPartitionValues;
        private String agentInfo;
        private HiveConf hiveConf;
        private int transactionBatchSize = 1;
        private boolean streamingOptimizations = true;
        private RecordWriter recordWriter;

        public Builder withDatabase(String str) {
            this.database = str;
            return this;
        }

        public Builder withTable(String str) {
            this.table = str;
            return this;
        }

        public Builder withStaticPartitionValues(List<String> list) {
            this.staticPartitionValues = list == null ? null : new ArrayList(list);
            return this;
        }

        public Builder withAgentInfo(String str) {
            this.agentInfo = str;
            return this;
        }

        public Builder withHiveConf(HiveConf hiveConf) {
            this.hiveConf = hiveConf;
            return this;
        }

        @InterfaceStability.Evolving
        public Builder withTransactionBatchSize(int i) {
            this.transactionBatchSize = i;
            return this;
        }

        public Builder withStreamingOptimizations(boolean z) {
            this.streamingOptimizations = z;
            return this;
        }

        public Builder withRecordWriter(RecordWriter recordWriter) {
            this.recordWriter = recordWriter;
            return this;
        }

        public HiveStreamingConnection connect() throws StreamingException {
            if (this.database == null) {
                throw new StreamingException("Database cannot be null for streaming connection");
            }
            if (this.table == null) {
                throw new StreamingException("Table cannot be null for streaming connection");
            }
            if (this.recordWriter == null) {
                throw new StreamingException("Record writer cannot be null for streaming connection");
            }
            HiveStreamingConnection hiveStreamingConnection = new HiveStreamingConnection(this);
            hiveStreamingConnection.getClass();
            ShutdownHookManager.addShutdownHook(hiveStreamingConnection::close, 11);
            Thread.setDefaultUncaughtExceptionHandler((thread, th) -> {
                hiveStreamingConnection.close();
            });
            return hiveStreamingConnection;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hive/streaming/HiveStreamingConnection$HeartbeatRunnable.class */
    public static class HeartbeatRunnable implements Runnable {
        private final HiveStreamingConnection conn;
        private final AtomicLong minTxnId;
        private final long maxTxnId;
        private final ReentrantLock transactionLock;
        private final AtomicBoolean isTxnClosed;

        HeartbeatRunnable(HiveStreamingConnection hiveStreamingConnection, AtomicLong atomicLong, long j, ReentrantLock reentrantLock, AtomicBoolean atomicBoolean) {
            this.conn = hiveStreamingConnection;
            this.minTxnId = atomicLong;
            this.maxTxnId = j;
            this.transactionLock = reentrantLock;
            this.isTxnClosed = atomicBoolean;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.transactionLock.lock();
            try {
                if (this.minTxnId.get() > 0) {
                    HeartbeatTxnRangeResponse heartbeatTxnRange = this.conn.getHeatbeatMSC().heartbeatTxnRange(this.minTxnId.get(), this.maxTxnId);
                    if (heartbeatTxnRange.getAborted().isEmpty() && heartbeatTxnRange.getNosuch().isEmpty()) {
                        HiveStreamingConnection.LOG.info("Heartbeat sent for range: [{}-{}]", Long.valueOf(this.minTxnId.get()), Long.valueOf(this.maxTxnId));
                    } else {
                        HiveStreamingConnection.LOG.error("Heartbeat failure: {}", heartbeatTxnRange.toString());
                        this.isTxnClosed.set(true);
                    }
                }
            } catch (TException e) {
                HiveStreamingConnection.LOG.warn("Failure to heartbeat for transaction range: [" + this.minTxnId.get() + "-" + this.maxTxnId + "]", e);
            } finally {
                this.transactionLock.unlock();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hive/streaming/HiveStreamingConnection$TransactionBatch.class */
    public static class TransactionBatch {
        private String username;
        private HiveStreamingConnection conn;
        private ScheduledExecutorService scheduledExecutorService;
        private RecordWriter recordWriter;
        private String partNameForLock;
        private List<TxnToWriteId> txnToWriteIds;
        private int currentTxnIndex;
        private TxnState state;
        private LockRequest lockRequest;
        private final ReentrantLock transactionLock;
        private final AtomicLong minTxnId;
        private final long maxTxnId;
        private final AtomicBoolean isTxnClosed;
        private String agentInfo;
        private int numTxns;
        private TxnState[] txnStatus;
        private long lastTxnUsed;
        static final /* synthetic */ boolean $assertionsDisabled;

        private TransactionBatch(HiveStreamingConnection hiveStreamingConnection) throws StreamingException {
            this.partNameForLock = null;
            this.currentTxnIndex = -1;
            this.lockRequest = null;
            this.transactionLock = new ReentrantLock();
            this.isTxnClosed = new AtomicBoolean(false);
            try {
                try {
                    if (hiveStreamingConnection.isPartitionedTable() && !hiveStreamingConnection.isDynamicPartitioning()) {
                        this.partNameForLock = Warehouse.makePartName(hiveStreamingConnection.tableObject.getPartitionKeys(), hiveStreamingConnection.staticPartitionValues);
                    }
                    this.conn = hiveStreamingConnection;
                    this.username = hiveStreamingConnection.username;
                    this.recordWriter = hiveStreamingConnection.recordWriter;
                    this.agentInfo = hiveStreamingConnection.agentInfo;
                    this.numTxns = hiveStreamingConnection.transactionBatchSize;
                    setupHeartBeatThread();
                    List<Long> openTxnImpl = openTxnImpl(this.username, this.numTxns);
                    this.txnToWriteIds = allocateWriteIdsImpl(openTxnImpl);
                    if (!$assertionsDisabled && this.txnToWriteIds.size() != this.numTxns) {
                        throw new AssertionError();
                    }
                    this.txnStatus = new TxnState[this.numTxns];
                    for (int i = 0; i < this.txnStatus.length; i++) {
                        if (!$assertionsDisabled && this.txnToWriteIds.get(i).getTxnId() != openTxnImpl.get(i).longValue()) {
                            throw new AssertionError();
                        }
                        this.txnStatus[i] = TxnState.OPEN;
                    }
                    this.state = TxnState.INACTIVE;
                    this.recordWriter.init(hiveStreamingConnection, this.txnToWriteIds.get(0).getWriteId(), this.txnToWriteIds.get(this.numTxns - 1).getWriteId());
                    this.minTxnId = new AtomicLong(openTxnImpl.get(0).longValue());
                    this.maxTxnId = openTxnImpl.get(openTxnImpl.size() - 1).longValue();
                    markDead(true);
                } catch (TException e) {
                    throw new StreamingException(hiveStreamingConnection.toString(), e);
                }
            } catch (Throwable th) {
                markDead(false);
                throw th;
            }
        }

        private void setupHeartBeatThread() {
            long j;
            this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HiveStreamingConnection-Heartbeat-Thread").build());
            try {
                j = DbTxnManager.getHeartbeatInterval(this.conn.conf);
            } catch (LockException e) {
                j = 60000;
            }
            long random = (long) (j * 0.75d * Math.random());
            HiveStreamingConnection.LOG.info("Starting heartbeat thread with interval: {} ms initialDelay: {} ms for agentInfo: {}", new Object[]{Long.valueOf(j), Long.valueOf(random), this.conn.agentInfo});
            this.scheduledExecutorService.scheduleWithFixedDelay(new HeartbeatRunnable(this.conn, this.minTxnId, this.maxTxnId, this.transactionLock, this.isTxnClosed), random, j, TimeUnit.MILLISECONDS);
        }

        private List<Long> openTxnImpl(String str, int i) throws TException {
            return this.conn.getMSC().openTxns(str, i).getTxn_ids();
        }

        private List<TxnToWriteId> allocateWriteIdsImpl(List<Long> list) throws TException {
            return this.conn.getMSC().allocateTableWriteIdsBatch(list, this.conn.database, this.conn.table);
        }

        public String toString() {
            if (this.txnToWriteIds == null || this.txnToWriteIds.isEmpty()) {
                return "{}";
            }
            StringBuilder sb = new StringBuilder(" TxnStatus[");
            TxnState[] txnStateArr = this.txnStatus;
            int length = txnStateArr.length;
            for (int i = 0; i < length; i++) {
                TxnState txnState = txnStateArr[i];
                sb.append(txnState == null ? "N" : txnState);
            }
            sb.append("] LastUsed ").append(JavaUtils.txnIdToString(this.lastTxnUsed));
            return "TxnId/WriteIds=[" + this.txnToWriteIds.get(0).getTxnId() + "/" + this.txnToWriteIds.get(0).getWriteId() + "..." + this.txnToWriteIds.get(this.txnToWriteIds.size() - 1).getTxnId() + "/" + this.txnToWriteIds.get(this.txnToWriteIds.size() - 1).getWriteId() + "] on connection = " + this.conn + "; " + ((Object) sb);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void beginNextTransaction() throws StreamingException {
            checkIsClosed();
            beginNextTransactionImpl();
        }

        private void beginNextTransactionImpl() throws TransactionError {
            this.state = TxnState.INACTIVE;
            if (this.currentTxnIndex + 1 >= this.txnToWriteIds.size()) {
                throw new InvalidTransactionState("No more transactions available in next batch for connection: " + this.conn + " user: " + this.username);
            }
            this.currentTxnIndex++;
            this.state = TxnState.OPEN;
            this.lastTxnUsed = getCurrentTxnId();
            this.lockRequest = createLockRequest(this.conn, this.partNameForLock, this.username, getCurrentTxnId(), this.agentInfo);
            try {
                if (this.conn.getMSC().lock(this.lockRequest).getState() != LockState.ACQUIRED) {
                    throw new TransactionError("Unable to acquire lock on " + this.conn);
                }
            } catch (TException e) {
                throw new TransactionError("Unable to acquire lock on " + this.conn, e);
            }
        }

        long getCurrentTxnId() {
            if (this.currentTxnIndex >= 0) {
                return this.txnToWriteIds.get(this.currentTxnIndex).getTxnId();
            }
            return -1L;
        }

        long getCurrentWriteId() {
            if (this.currentTxnIndex >= 0) {
                return this.txnToWriteIds.get(this.currentTxnIndex).getWriteId();
            }
            return -1L;
        }

        TxnState getCurrentTransactionState() {
            return this.state;
        }

        int remainingTransactions() {
            return this.currentTxnIndex >= 0 ? (this.txnToWriteIds.size() - this.currentTxnIndex) - 1 : this.txnToWriteIds.size();
        }

        public void write(byte[] bArr) throws StreamingException {
            checkIsClosed();
            boolean z = false;
            try {
                try {
                    this.recordWriter.write(getCurrentWriteId(), bArr);
                    z = true;
                    markDead(true);
                } catch (SerializationError e) {
                    throw e;
                }
            } catch (Throwable th) {
                markDead(z);
                throw th;
            }
        }

        public void write(InputStream inputStream) throws StreamingException {
            checkIsClosed();
            boolean z = false;
            try {
                try {
                    this.recordWriter.write(getCurrentWriteId(), inputStream);
                    z = true;
                    markDead(true);
                } catch (SerializationError e) {
                    throw e;
                }
            } catch (Throwable th) {
                markDead(z);
                throw th;
            }
        }

        private void checkIsClosed() throws StreamingException {
            if (this.isTxnClosed.get()) {
                throw new StreamingException("Transaction" + toString() + " is closed()");
            }
        }

        private void markDead(boolean z) throws StreamingException {
            if (z) {
                return;
            }
            close();
        }

        void commit() throws StreamingException {
            checkIsClosed();
            boolean z = false;
            try {
                commitImpl();
                z = true;
                markDead(true);
            } catch (Throwable th) {
                markDead(z);
                throw th;
            }
        }

        private void commitImpl() throws StreamingException {
            try {
                this.recordWriter.flush();
                TxnToWriteId txnToWriteId = this.txnToWriteIds.get(this.currentTxnIndex);
                if (this.conn.isDynamicPartitioning()) {
                    this.conn.getMSC().addDynamicPartitions(txnToWriteId.getTxnId(), txnToWriteId.getWriteId(), this.conn.database, this.conn.table, new ArrayList(this.recordWriter.getPartitions()), DataOperationType.INSERT);
                }
                this.transactionLock.lock();
                try {
                    this.conn.getMSC().commitTxn(txnToWriteId.getTxnId());
                    if (this.currentTxnIndex + 1 < this.txnToWriteIds.size()) {
                        this.minTxnId.set(this.txnToWriteIds.get(this.currentTxnIndex + 1).getTxnId());
                    } else {
                        this.minTxnId.set(-1L);
                    }
                    this.transactionLock.unlock();
                    this.state = TxnState.COMMITTED;
                    this.txnStatus[this.currentTxnIndex] = TxnState.COMMITTED;
                } catch (Throwable th) {
                    this.transactionLock.unlock();
                    throw th;
                }
            } catch (TxnAbortedException e) {
                throw new TransactionError("Aborted transaction cannot be committed", e);
            } catch (TException e2) {
                throw new TransactionError("Unable to commitTransaction transaction" + getCurrentTxnId(), e2);
            } catch (NoSuchTxnException e3) {
                throw new TransactionError("Invalid transaction id : " + getCurrentTxnId(), e3);
            }
        }

        void abort() throws StreamingException {
            if (this.isTxnClosed.get()) {
                return;
            }
            abort(false);
        }

        private void abort(boolean z) throws StreamingException {
            abortImpl(z);
        }

        private void abortImpl(boolean z) throws StreamingException {
            if (this.minTxnId == null) {
                return;
            }
            this.transactionLock.lock();
            try {
                try {
                    if (z) {
                        this.minTxnId.set(-1L);
                        this.currentTxnIndex = Math.max(this.currentTxnIndex + ((this.state == TxnState.ABORTED || this.state == TxnState.COMMITTED) ? 1 : 0), 0);
                        while (this.currentTxnIndex < this.txnToWriteIds.size()) {
                            this.conn.getMSC().rollbackTxn(this.txnToWriteIds.get(this.currentTxnIndex).getTxnId());
                            this.txnStatus[this.currentTxnIndex] = TxnState.ABORTED;
                            this.currentTxnIndex++;
                        }
                        this.currentTxnIndex--;
                    } else {
                        if (this.currentTxnIndex + 1 < this.txnToWriteIds.size()) {
                            this.minTxnId.set(this.txnToWriteIds.get(this.currentTxnIndex + 1).getTxnId());
                        } else {
                            this.minTxnId.set(-1L);
                        }
                        long currentTxnId = getCurrentTxnId();
                        if (currentTxnId > 0) {
                            this.conn.getMSC().rollbackTxn(currentTxnId);
                            this.txnStatus[this.currentTxnIndex] = TxnState.ABORTED;
                        }
                    }
                    this.state = TxnState.ABORTED;
                    this.transactionLock.unlock();
                } catch (NoSuchTxnException e) {
                    throw new TransactionError("Unable to abort invalid transaction id : " + getCurrentTxnId(), e);
                } catch (TException e2) {
                    throw new TransactionError("Unable to abort transaction id : " + getCurrentTxnId(), e2);
                }
            } catch (Throwable th) {
                this.transactionLock.unlock();
                throw th;
            }
        }

        public boolean isClosed() {
            return this.isTxnClosed.get();
        }

        public void close() throws StreamingException {
            if (this.isTxnClosed.get()) {
                return;
            }
            this.isTxnClosed.set(true);
            try {
                abort(true);
                try {
                    closeImpl();
                } catch (Exception e) {
                    HiveStreamingConnection.LOG.error("Fatal error on " + toString() + "; cause " + e.getMessage(), e);
                    throw new StreamingException("Unable to close", e);
                }
            } catch (Exception e2) {
                HiveStreamingConnection.LOG.error("Fatal error on " + toString() + "; cause " + e2.getMessage(), e2);
                throw new StreamingException("Unable to abort", e2);
            }
        }

        private void closeImpl() throws StreamingException {
            this.state = TxnState.INACTIVE;
            this.recordWriter.close();
            if (this.scheduledExecutorService != null) {
                this.scheduledExecutorService.shutdownNow();
            }
        }

        static LockRequest createLockRequest(HiveStreamingConnection hiveStreamingConnection, String str, String str2, long j, String str3) {
            LockRequestBuilder lockRequestBuilder = new LockRequestBuilder(str3);
            lockRequestBuilder.setUser(str2);
            lockRequestBuilder.setTransactionId(j);
            LockComponentBuilder operationType = new LockComponentBuilder().setDbName(hiveStreamingConnection.database).setTableName(hiveStreamingConnection.table).setShared().setOperationType(DataOperationType.INSERT);
            if (hiveStreamingConnection.isDynamicPartitioning()) {
                operationType.setIsDynamicPartitionWrite(true);
            }
            if (str != null && !str.isEmpty()) {
                operationType.setPartitionName(str);
            }
            lockRequestBuilder.addLockComponent(operationType.build());
            return lockRequestBuilder.build();
        }

        static {
            $assertionsDisabled = !HiveStreamingConnection.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/apache/hive/streaming/HiveStreamingConnection$TxnState.class */
    public enum TxnState {
        INACTIVE("I"),
        OPEN("O"),
        COMMITTED("C"),
        ABORTED("A");

        private final String code;

        TxnState(String str) {
            this.code = str;
        }

        @Override // java.lang.Enum
        public String toString() {
            return this.code;
        }
    }

    private HiveStreamingConnection(Builder builder) throws StreamingException {
        this.isConnectionClosed = new AtomicBoolean(false);
        this.tableObject = null;
        this.database = builder.database.toLowerCase();
        this.table = builder.table.toLowerCase();
        this.staticPartitionValues = builder.staticPartitionValues;
        this.conf = builder.hiveConf;
        this.agentInfo = builder.agentInfo;
        this.streamingOptimizations = builder.streamingOptimizations;
        UserGroupInformation userGroupInformation = null;
        try {
            userGroupInformation = UserGroupInformation.getLoginUser();
        } catch (IOException e) {
            LOG.warn("Unable to get logged in user via UGI. err: {}", e.getMessage());
        }
        if (userGroupInformation == null) {
            this.username = System.getProperty("user.name");
            this.secureMode = false;
        } else {
            this.username = userGroupInformation.getShortUserName();
            this.secureMode = userGroupInformation.hasKerberosCredentials();
        }
        this.transactionBatchSize = builder.transactionBatchSize;
        this.recordWriter = builder.recordWriter;
        this.connectionStats = new ConnectionStats();
        if (this.agentInfo == null) {
            try {
                this.agentInfo = this.username + ":" + InetAddress.getLocalHost().getHostName() + ":" + Thread.currentThread().getName();
            } catch (UnknownHostException e2) {
                this.agentInfo = UUID.randomUUID().toString();
            }
        }
        if (this.conf == null) {
            this.conf = createHiveConf(getClass(), DEFAULT_METASTORE_URI);
        }
        overrideConfSettings(this.conf);
        this.metastoreUri = this.conf.get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName());
        this.msClient = getMetaStoreClient(this.conf, this.metastoreUri, this.secureMode, "streaming-connection");
        this.heartbeatMSClient = getMetaStoreClient(this.conf, this.metastoreUri, this.secureMode, "streaming-connection-heartbeat");
        validateTable();
        LOG.info("STREAMING CONNECTION INFO: {}", toConnectionInfoString());
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    private void setPartitionedTable(boolean z) {
        this.isPartitionedTable = z;
    }

    public String toString() {
        return "{ metaStoreUri: " + this.metastoreUri + ", database: " + this.database + ", table: " + this.table + " }";
    }

    private String toConnectionInfoString() {
        return "{ metastore-uri: " + this.metastoreUri + ", database: " + this.database + ", table: " + this.table + ", partitioned-table: " + isPartitionedTable() + ", dynamic-partitioning: " + isDynamicPartitioning() + ", username: " + this.username + ", secure-mode: " + this.secureMode + ", record-writer: " + this.recordWriter.getClass().getSimpleName() + ", agent-info: " + this.agentInfo + " }";
    }

    @VisibleForTesting
    String toTransactionString() {
        return this.currentTransactionBatch == null ? "" : this.currentTransactionBatch.toString();
    }

    @Override // org.apache.hive.streaming.PartitionHandler
    public PartitionInfo createPartitionIfNotExists(List<String> list) throws StreamingException {
        String str = null;
        String str2 = null;
        boolean z = false;
        try {
            Map makeSpecFromValues = Warehouse.makeSpecFromValues(this.tableObject.getPartitionKeys(), list);
            AddPartitionDesc addPartitionDesc = new AddPartitionDesc(this.database, this.table, true);
            str2 = Warehouse.makePartName(this.tableObject.getPartitionKeys(), list);
            str = new Path(this.tableObject.getDataLocation(), Warehouse.makePartPath(makeSpecFromValues)).toString();
            addPartitionDesc.addPartition(makeSpecFromValues, str);
            getMSC().add_partition(Hive.convertAddSpecToMetaPartition(this.tableObject, addPartitionDesc.getPartition(0), this.conf));
        } catch (AlreadyExistsException e) {
            z = true;
        } catch (HiveException | TException e2) {
            throw new StreamingException("Unable to creation partition for values: " + list + " connection: " + toConnectionInfoString(), e2);
        }
        return new PartitionInfo(str2, str, z);
    }

    IMetaStoreClient getMSC() {
        this.connectionStats.incrementMetastoreCalls();
        return this.msClient;
    }

    IMetaStoreClient getHeatbeatMSC() {
        this.connectionStats.incrementMetastoreCalls();
        return this.heartbeatMSClient;
    }

    private void validateTable() throws InvalidTable, ConnectionError {
        try {
            this.tableObject = new Table(getMSC().getTable(this.database, this.table));
            if (!AcidUtils.isFullAcidTable(this.tableObject)) {
                LOG.error("HiveEndPoint " + this + " must use an acid table");
                throw new InvalidTable(this.database, this.table, "is not an Acid table");
            }
            if (this.tableObject.getPartitionKeys() == null || this.tableObject.getPartitionKeys().isEmpty()) {
                setPartitionedTable(false);
            } else {
                setPartitionedTable(true);
            }
            if (isPartitionedTable() || this.staticPartitionValues == null || this.staticPartitionValues.isEmpty()) {
                return;
            }
            String str = toString() + " specifies partitions for un-partitioned table";
            LOG.error(str);
            throw new ConnectionError(str);
        } catch (Exception e) {
            LOG.warn("Unable to validate the table for connection: " + toConnectionInfoString(), e);
            throw new InvalidTable(this.database, this.table, e);
        }
    }

    private void beginNextTransaction() throws StreamingException {
        if (this.currentTransactionBatch == null) {
            this.currentTransactionBatch = createNewTransactionBatch();
            LOG.info("Opened new transaction batch {}", this.currentTransactionBatch);
        }
        if (this.currentTransactionBatch.isClosed()) {
            throw new StreamingException("Cannot begin next transaction on a closed streaming connection");
        }
        if (this.currentTransactionBatch.remainingTransactions() == 0) {
            LOG.info("Transaction batch {} is done. Rolling over to next transaction batch.", this.currentTransactionBatch);
            this.currentTransactionBatch.close();
            this.currentTransactionBatch = createNewTransactionBatch();
            LOG.info("Rolled over to new transaction batch {}", this.currentTransactionBatch);
        }
        this.currentTransactionBatch.beginNextTransaction();
    }

    private TransactionBatch createNewTransactionBatch() throws StreamingException {
        return new TransactionBatch();
    }

    private void checkClosedState() throws StreamingException {
        if (this.isConnectionClosed.get()) {
            throw new StreamingException("Streaming connection is closed already.");
        }
    }

    private void checkState() throws StreamingException {
        checkClosedState();
        if (this.currentTransactionBatch == null) {
            throw new StreamingException("Transaction batch is null. Missing beginTransaction?");
        }
        if (this.currentTransactionBatch.state != TxnState.OPEN) {
            throw new StreamingException("Transaction state is not OPEN. Missing beginTransaction?");
        }
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public void beginTransaction() throws StreamingException {
        checkClosedState();
        beginNextTransaction();
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public void commitTransaction() throws StreamingException {
        checkState();
        this.currentTransactionBatch.commit();
        this.connectionStats.incrementCommittedTransactions();
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public void abortTransaction() throws StreamingException {
        checkState();
        this.currentTransactionBatch.abort();
        this.connectionStats.incrementAbortedTransactions();
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public void write(byte[] bArr) throws StreamingException {
        checkState();
        this.currentTransactionBatch.write(bArr);
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public void write(InputStream inputStream) throws StreamingException {
        checkState();
        this.currentTransactionBatch.write(inputStream);
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public void close() {
        if (this.isConnectionClosed.get()) {
            return;
        }
        this.isConnectionClosed.set(true);
        try {
            if (this.currentTransactionBatch != null) {
                this.currentTransactionBatch.close();
            }
        } catch (StreamingException e) {
            LOG.warn("Unable to close current transaction batch: " + this.currentTransactionBatch, e);
        } finally {
            getMSC().close();
            getHeatbeatMSC().close();
        }
        LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats());
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public ConnectionStats getConnectionStats() {
        return this.connectionStats;
    }

    private static IMetaStoreClient getMetaStoreClient(HiveConf hiveConf, String str, boolean z, String str2) throws ConnectionError {
        if (str != null) {
            hiveConf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), str);
        }
        if (z) {
            hiveConf.setBoolean(MetastoreConf.ConfVars.USE_THRIFT_SASL.getHiveName(), true);
        }
        try {
            LOG.info("Creating metastore client for {}", str2);
            return HiveMetaStoreUtils.getHiveMetastoreClient(hiveConf);
        } catch (MetaException | IOException e) {
            throw new ConnectionError("Error connecting to Hive Metastore URI: " + str + ". " + e.getMessage(), (Exception) e);
        }
    }

    @VisibleForTesting
    TxnState getCurrentTransactionState() {
        return this.currentTransactionBatch.getCurrentTransactionState();
    }

    @VisibleForTesting
    int remainingTransactions() {
        return this.currentTransactionBatch.remainingTransactions();
    }

    @VisibleForTesting
    Long getCurrentTxnId() {
        return Long.valueOf(this.currentTransactionBatch.getCurrentTxnId());
    }

    private HiveConf createHiveConf(Class<?> cls, String str) {
        HiveConf hiveConf = new HiveConf(cls);
        if (str != null) {
            hiveConf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), str);
        }
        return hiveConf;
    }

    private void overrideConfSettings(HiveConf hiveConf) {
        setHiveConf(hiveConf, HiveConf.ConfVars.HIVE_TXN_MANAGER, DbTxnManager.class.getName());
        setHiveConf(hiveConf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
        setHiveConf(hiveConf, MetastoreConf.ConfVars.EXECUTE_SET_UGI.getHiveName());
        setHiveConf(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
        setHiveConf(hiveConf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
        if (this.streamingOptimizations) {
            setHiveConf(hiveConf, HiveConf.ConfVars.HIVE_ORC_DELTA_STREAMING_OPTIMIZATIONS_ENABLED, true);
        }
        setHiveConf(hiveConf, HiveConf.ConfVars.METASTORE_CLIENT_CACHE_ENABLED, false);
    }

    private static void setHiveConf(HiveConf hiveConf, HiveConf.ConfVars confVars, String str) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Overriding HiveConf setting : " + confVars + " = " + str);
        }
        hiveConf.setVar(confVars, str);
    }

    private static void setHiveConf(HiveConf hiveConf, HiveConf.ConfVars confVars, boolean z) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Overriding HiveConf setting : " + confVars + " = " + z);
        }
        hiveConf.setBoolVar(confVars, true);
    }

    private static void setHiveConf(HiveConf hiveConf, String str) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Overriding HiveConf setting : " + str + " = true");
        }
        hiveConf.setBoolean(str, true);
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public HiveConf getHiveConf() {
        return this.conf;
    }

    @Override // org.apache.hive.streaming.ConnectionInfo
    public String getMetastoreUri() {
        return this.metastoreUri;
    }

    @Override // org.apache.hive.streaming.ConnectionInfo
    public Table getTable() {
        return this.tableObject;
    }

    @Override // org.apache.hive.streaming.ConnectionInfo
    public List<String> getStaticPartitionValues() {
        return this.staticPartitionValues;
    }

    @Override // org.apache.hive.streaming.ConnectionInfo
    public String getAgentInfo() {
        return this.agentInfo;
    }

    @Override // org.apache.hive.streaming.ConnectionInfo
    public boolean isPartitionedTable() {
        return this.isPartitionedTable;
    }

    @Override // org.apache.hive.streaming.ConnectionInfo
    public boolean isDynamicPartitioning() {
        return isPartitionedTable() && (this.staticPartitionValues == null || this.staticPartitionValues.isEmpty());
    }
}
