/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.deltastreamer;

import com.codahale.metrics.Timer;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncConfigHolder;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;
import org.apache.hudi.util.SparkKeyGenUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig;
import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallback;
import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaSet;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.collection.JavaConversions;

public class DeltaSync
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LogManager.getLogger(DeltaSync.class);
    private final HoodieDeltaStreamer.Config cfg;
    private transient SourceFormatAdapter formatAdapter;
    private transient SchemaProvider userProvidedSchemaProvider;
    private transient SchemaProvider schemaProvider;
    private transient Option<Transformer> transformer;
    private KeyGenerator keyGenerator;
    private transient FileSystem fs;
    private transient JavaSparkContext jssc;
    private transient SparkSession sparkSession;
    private transient Configuration conf;
    private final TypedProperties props;
    private transient java.util.function.Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient;
    private transient Option<HoodieTimeline> commitTimelineOpt;
    private transient Option<HoodieTimeline> allCommitsTimelineOpt;
    private final SchemaSet processedSchema;
    private transient Option<EmbeddedTimelineService> embeddedTimelineService = Option.empty();
    private transient SparkRDDWriteClient writeClient;
    private transient HoodieDeltaStreamerMetrics metrics;
    private transient HoodieMetrics hoodieMetrics;

    public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf, java.util.function.Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
        this.cfg = cfg;
        this.jssc = jssc;
        this.sparkSession = sparkSession;
        this.fs = fs;
        this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
        this.props = props;
        this.userProvidedSchemaProvider = schemaProvider;
        this.processedSchema = new SchemaSet();
        this.keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator((TypedProperties)props);
        this.refreshTimeline();
        this.registerAvroSchemas(schemaProvider);
        this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames);
        this.metrics = new HoodieDeltaStreamerMetrics(this.getHoodieClientConfig(this.schemaProvider));
        this.hoodieMetrics = new HoodieMetrics(this.getHoodieClientConfig(this.schemaProvider));
        this.formatAdapter = new SourceFormatAdapter(UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider, this.metrics));
        this.conf = conf;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void refreshTimeline() throws IOException {
        if (this.fs.exists(new Path(this.cfg.targetBasePath))) {
            try {
                HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(this.fs.getConf())).setBasePath(this.cfg.targetBasePath).setPayloadClassName(this.cfg.payloadClassName).build();
                switch (meta.getTableType()) {
                    case COPY_ON_WRITE: {
                        this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
                        this.allCommitsTimelineOpt = Option.of(meta.getActiveTimeline().getAllCommitsTimeline());
                        return;
                    }
                    case MERGE_ON_READ: {
                        this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants());
                        this.allCommitsTimelineOpt = Option.of(meta.getActiveTimeline().getAllCommitsTimeline());
                        return;
                    }
                    default: {
                        throw new HoodieException("Unsupported table type :" + (Object)((Object)meta.getTableType()));
                    }
                }
            }
            catch (HoodieIOException e) {
                LOG.warn((Object)("Full exception msg " + e.getMessage()));
                if (!e.getMessage().contains("Could not load Hoodie properties")) throw e;
                if (!e.getMessage().contains("hoodie.properties")) throw e;
                String basePathWithForwardSlash = this.cfg.targetBasePath.endsWith("/") ? this.cfg.targetBasePath : String.format("%s/", this.cfg.targetBasePath);
                String pathToHoodieProps = String.format("%s%s/%s", basePathWithForwardSlash, ".hoodie", "hoodie.properties");
                String pathToHoodiePropsBackup = String.format("%s%s/%s", basePathWithForwardSlash, ".hoodie", "hoodie.properties.backup");
                if (this.fs.exists(new Path(basePathWithForwardSlash)) && this.fs.exists(new Path(pathToHoodieProps)) && this.fs.exists(new Path(pathToHoodiePropsBackup))) {
                    return;
                }
                boolean bl = false;
                boolean hoodiePropertiesExists = bl;
                if (hoodiePropertiesExists) return;
                LOG.warn((Object)"Base path exists, but table is not fully initialized. Re-initializing again");
                this.initializeEmptyTable();
                HoodieTableMetaClient metaClientToValidate = HoodieTableMetaClient.builder().setConf(new Configuration(this.fs.getConf())).setBasePath(this.cfg.targetBasePath).build();
                if (metaClientToValidate.reloadActiveTimeline().getInstants().count() <= 0L) return;
                this.fs.delete(new Path(String.format("%s%s/%s", basePathWithForwardSlash, ".hoodie", "hoodie.properties")));
                throw new HoodieIOException("hoodie.properties is missing. Likely due to some external entity. Please populate the hoodie.properties and restart the pipeline. ", e.getIOException());
            }
        } else {
            this.initializeEmptyTable();
        }
    }

    private void initializeEmptyTable() throws IOException {
        this.commitTimelineOpt = Option.empty();
        this.allCommitsTimelineOpt = Option.empty();
        String partitionColumns = SparkKeyGenUtils.getPartitionColumns((KeyGenerator)this.keyGenerator, (TypedProperties)this.props);
        HoodieTableMetaClient.withPropertyBuilder().setTableType(this.cfg.tableType).setTableName(this.cfg.targetTableName).setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue()).setPayloadClassName(this.cfg.payloadClassName).setBaseFileFormat(this.cfg.baseFileFormat).setPartitionFields(partitionColumns).setRecordKeyFields(this.props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key())).setPopulateMetaFields(this.props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())).setKeyGeneratorClassProp(this.props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), SimpleKeyGenerator.class.getName())).setPreCombineField(this.cfg.sourceOrderingField).setPartitionMetafileUseBaseFormat(this.props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())).setShouldDropPartitionColumns(this.isDropPartitionColumns()).initTable(new Configuration(this.jssc.hadoopConfiguration()), this.cfg.targetBasePath);
    }

    public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException {
        Pair<Option<String>, JavaRDD<WriteStatus>> result = null;
        Timer.Context overallTimerContext = this.metrics.getOverallTimerContext();
        this.refreshTimeline();
        Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> srcRecordsWithCkpt = this.readFromSource(this.commitTimelineOpt);
        if (null != srcRecordsWithCkpt) {
            Option<String> pendingClusteringInstant;
            if (null == this.writeClient) {
                this.schemaProvider = srcRecordsWithCkpt.getKey();
                this.setupWriteClient();
            } else {
                Schema newSourceSchema = srcRecordsWithCkpt.getKey().getSourceSchema();
                Schema newTargetSchema = srcRecordsWithCkpt.getKey().getTargetSchema();
                if (!this.processedSchema.isSchemaPresent(newSourceSchema) || !this.processedSchema.isSchemaPresent(newTargetSchema)) {
                    LOG.info((Object)("Seeing new schema. Source :" + newSourceSchema.toString(true) + ", Target :" + newTargetSchema.toString(true)));
                    this.reInitWriteClient(newSourceSchema, newTargetSchema);
                    this.processedSchema.addSchema(newSourceSchema);
                    this.processedSchema.addSchema(newTargetSchema);
                }
            }
            if (this.cfg.retryLastPendingInlineClusteringJob.booleanValue() && this.getHoodieClientConfig(this.schemaProvider).inlineClusteringEnabled() && (pendingClusteringInstant = this.getLastPendingClusteringInstant(this.allCommitsTimelineOpt)).isPresent()) {
                this.writeClient.cluster(pendingClusteringInstant.get(), true);
            }
            result = this.writeToSink(srcRecordsWithCkpt.getRight().getRight(), srcRecordsWithCkpt.getRight().getLeft(), this.metrics, overallTimerContext);
        }
        this.metrics.updateDeltaStreamerSyncMetrics(System.currentTimeMillis());
        this.jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist);
        return result;
    }

    private Option<String> getLastPendingClusteringInstant(Option<HoodieTimeline> commitTimelineOpt) {
        if (commitTimelineOpt.isPresent()) {
            Option<HoodieInstant> pendingClusteringInstant = commitTimelineOpt.get().filterPendingReplaceTimeline().lastInstant();
            return pendingClusteringInstant.isPresent() ? Option.of(pendingClusteringInstant.get().getTimestamp()) : Option.empty();
        }
        return Option.empty();
    }

    public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(Option<HoodieTimeline> commitTimelineOpt) throws IOException {
        Option<Object> resumeCheckpointStr = Option.empty();
        if (commitTimelineOpt.isPresent()) {
            resumeCheckpointStr = this.getCheckpointToResume(commitTimelineOpt);
        } else {
            String partitionColumns = SparkKeyGenUtils.getPartitionColumns((KeyGenerator)this.keyGenerator, (TypedProperties)this.props);
            HoodieTableMetaClient.withPropertyBuilder().setTableType(this.cfg.tableType).setTableName(this.cfg.targetTableName).setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue()).setPayloadClassName(this.cfg.payloadClassName).setBaseFileFormat(this.cfg.baseFileFormat).setPartitionFields(partitionColumns).setRecordKeyFields(this.props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key())).setPopulateMetaFields(this.props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())).setKeyGeneratorClassProp(this.props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), SimpleKeyGenerator.class.getName())).setPartitionMetafileUseBaseFormat(this.props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())).setShouldDropPartitionColumns(this.isDropPartitionColumns()).initTable(new Configuration(this.jssc.hadoopConfiguration()), this.cfg.targetBasePath);
        }
        LOG.debug((Object)("Checkpoint from config: " + this.cfg.checkpoint));
        if (!resumeCheckpointStr.isPresent() && this.cfg.checkpoint != null) {
            resumeCheckpointStr = Option.of(this.cfg.checkpoint);
        }
        LOG.info((Object)("Checkpoint to resume from : " + resumeCheckpointStr));
        int maxRetryCount = this.cfg.retryOnSourceFailures != false ? this.cfg.maxRetryCount : 1;
        int curRetryCount = 0;
        Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> sourceDataToSync = null;
        while (curRetryCount++ < maxRetryCount && sourceDataToSync == null) {
            try {
                sourceDataToSync = this.fetchFromSource(resumeCheckpointStr);
            }
            catch (HoodieSourceTimeoutException e) {
                if (curRetryCount >= maxRetryCount) {
                    throw e;
                }
                try {
                    LOG.error((Object)("Exception thrown while fetching data from source. Msg : " + e.getMessage() + ", class : " + e.getClass() + ", cause : " + e.getCause()));
                    LOG.error((Object)("Sleeping for " + this.cfg.retryIntervalSecs + " before retrying again. Current retry count " + curRetryCount + ", max retry count " + this.cfg.maxRetryCount));
                    Thread.sleep(this.cfg.retryIntervalSecs * 1000);
                }
                catch (InterruptedException ex) {
                    LOG.error((Object)("Ignoring InterruptedException while waiting to retry on source failure " + e.getMessage()));
                }
            }
        }
        return sourceDataToSync;
    }

    private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSource(Option<String> resumeCheckpointStr) {
        SchemaProvider schemaProvider;
        Option<Dataset<Row>> avroRDDOptional;
        String checkpointStr;
        InputBatch<Dataset<Row>> dataAndCheckpoint;
        if (this.transformer.isPresent()) {
            dataAndCheckpoint = this.formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, this.cfg.sourceLimit);
            Option<Dataset> transformed = dataAndCheckpoint.getBatch().map(data -> this.transformer.get().apply(this.jssc, this.sparkSession, (Dataset<Row>)data, this.props));
            checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
            boolean reconcileSchema = this.props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
            if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) {
                avroRDDOptional = transformed.map(t -> HoodieSparkUtils.createRdd((Dataset)t, (String)"hoodie_source", (String)"hoodie.source", (boolean)reconcileSchema, Option.of(this.userProvidedSchemaProvider.getTargetSchema())).toJavaRDD());
                schemaProvider = this.userProvidedSchemaProvider;
            } else {
                schemaProvider = transformed.map(r -> {
                    SchemaProvider targetSchemaProvider = null;
                    targetSchemaProvider = reconcileSchema ? UtilHelpers.createLatestSchemaProvider(r.schema(), this.jssc, this.fs, this.cfg.targetBasePath) : UtilHelpers.createRowBasedSchemaProvider(r.schema(), this.props, this.jssc);
                    return new DelegatingSchemaProvider(this.props, this.jssc, dataAndCheckpoint.getSchemaProvider(), targetSchemaProvider);
                }).orElse(dataAndCheckpoint.getSchemaProvider());
                avroRDDOptional = transformed.map(t -> HoodieSparkUtils.createRdd((Dataset)t, (String)"hoodie_source", (String)"hoodie.source", (boolean)reconcileSchema, Option.ofNullable(schemaProvider.getTargetSchema())).toJavaRDD());
            }
        } else {
            dataAndCheckpoint = this.formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, this.cfg.sourceLimit);
            avroRDDOptional = dataAndCheckpoint.getBatch();
            checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
            schemaProvider = dataAndCheckpoint.getSchemaProvider();
        }
        if (!this.cfg.allowCommitOnNoCheckpointChange.booleanValue() && Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
            LOG.info((Object)("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(" + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")"));
            String commitActionType = CommitUtils.getCommitActionType(this.cfg.operation, HoodieTableType.valueOf(this.cfg.tableType));
            this.hoodieMetrics.updateMetricsForEmptyData(commitActionType);
            return null;
        }
        this.jssc.setJobGroup(this.getClass().getSimpleName(), "Checking if input is empty");
        if (!avroRDDOptional.isPresent() || ((JavaRDD)avroRDDOptional.get()).isEmpty()) {
            LOG.info((Object)"No new data, perform empty commit.");
            return Pair.of(schemaProvider, Pair.of(checkpointStr, this.jssc.emptyRDD()));
        }
        boolean shouldCombine = this.cfg.filterDupes != false || this.cfg.operation.equals((Object)WriteOperationType.UPSERT);
        Set<String> partitionColumns = this.getPartitionColumns(this.keyGenerator, this.props);
        JavaRDD avroRDD = (JavaRDD)avroRDDOptional.get();
        JavaRDD records = avroRDD.map((Function & Serializable)record -> {
            GenericRecord gr = this.isDropPartitionColumns() != false ? HoodieAvroUtils.removeFields(record, partitionColumns) : record;
            HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload((String)this.cfg.payloadClassName, (GenericRecord)gr, (Comparable)((Comparable)HoodieAvroUtils.getNestedFieldVal(gr, this.cfg.sourceOrderingField, false, this.props.getBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))) : DataSourceUtils.createPayload((String)this.cfg.payloadClassName, (GenericRecord)gr);
            return new HoodieAvroRecord<HoodieRecordPayload>(this.keyGenerator.getKey((GenericRecord)record), payload);
        });
        return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
    }

    private Option<String> getCheckpointToResume(Option<HoodieTimeline> commitTimelineOpt) throws IOException {
        Option<String> resumeCheckpointStr = Option.empty();
        Option<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
        if (lastCommit.isPresent()) {
            Option<HoodieCommitMetadata> commitMetadataOption = this.getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get());
            if (commitMetadataOption.isPresent()) {
                HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
                LOG.debug((Object)("Checkpoint reset from metadata: " + commitMetadata.getMetadata("deltastreamer.checkpoint.reset_key")));
                if (this.cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.reset_key")) || !this.cfg.checkpoint.equals(commitMetadata.getMetadata("deltastreamer.checkpoint.reset_key")))) {
                    resumeCheckpointStr = Option.of(this.cfg.checkpoint);
                } else if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.key"))) {
                    resumeCheckpointStr = Option.of(commitMetadata.getMetadata("deltastreamer.checkpoint.key"));
                } else if (HoodieTimeline.compareTimestamps("00000000000002", HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
                    throw new HoodieDeltaStreamerException("Unable to find previous checkpoint. Please double check if this table was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :" + commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata=" + commitMetadata.toJsonString());
                }
                if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.reset_key"))) {
                    this.props.remove(KafkaOffsetGen.Config.KAFKA_CHECKPOINT_TYPE.key());
                }
            } else if (this.cfg.checkpoint != null) {
                resumeCheckpointStr = Option.of(this.cfg.checkpoint);
            }
        }
        return resumeCheckpointStr;
    }

    protected Option<HoodieCommitMetadata> getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws IOException {
        return timeline.getReverseOrderedInstants().map(instant -> {
            try {
                HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails((HoodieInstant)instant).get(), HoodieCommitMetadata.class);
                if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.key")) || !StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.reset_key"))) {
                    return Option.of(commitMetadata);
                }
                return Option.empty();
            }
            catch (IOException e) {
                throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + instant.toString(), e);
            }
        }).filter(Option::isPresent).findFirst().orElse(Option.empty());
    }

    /*
     * Enabled aggressive block sorting
     */
    private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRecord> records, String checkpointStr, HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) {
        boolean hasErrors;
        JavaRDD writeStatusRDD;
        Option scheduledCompactionInstant = Option.empty();
        if (this.cfg.filterDupes.booleanValue()) {
            records = DataSourceUtils.dropDuplicates((JavaSparkContext)this.jssc, (JavaRDD)records, (HoodieWriteConfig)this.writeClient.getConfig());
        }
        boolean isEmpty = records.isEmpty();
        String instantTime = this.startCommit();
        LOG.info((Object)("Starting commit  : " + instantTime));
        switch (this.cfg.operation) {
            case INSERT: {
                writeStatusRDD = this.writeClient.insert(records, instantTime);
                break;
            }
            case UPSERT: {
                writeStatusRDD = this.writeClient.upsert(records, instantTime);
                break;
            }
            case BULK_INSERT: {
                writeStatusRDD = this.writeClient.bulkInsert(records, instantTime);
                break;
            }
            case INSERT_OVERWRITE: {
                writeStatusRDD = this.writeClient.insertOverwrite(records, instantTime).getWriteStatuses();
                break;
            }
            case INSERT_OVERWRITE_TABLE: {
                writeStatusRDD = this.writeClient.insertOverwriteTable(records, instantTime).getWriteStatuses();
                break;
            }
            case DELETE_PARTITION: {
                List partitions = records.map((Function & Serializable)record -> record.getPartitionPath()).distinct().collect();
                writeStatusRDD = this.writeClient.deletePartitions(partitions, instantTime).getWriteStatuses();
                break;
            }
            default: {
                throw new HoodieDeltaStreamerException("Unknown operation : " + (Object)((Object)this.cfg.operation));
            }
        }
        long totalErrorRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
        long totalRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
        boolean bl = hasErrors = totalErrorRecords > 0L;
        if (hasErrors && !this.cfg.commitOnErrors.booleanValue()) {
            LOG.error((Object)("Delta Sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords));
            LOG.error((Object)"Printing out the top 100 errors");
            writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
                LOG.error((Object)"Global error :", ws.getGlobalError());
                if (ws.getErrors().size() > 0) {
                    ws.getErrors().forEach((key, value) -> LOG.trace((Object)("Error for key:" + key + " is " + value)));
                }
            });
            this.writeClient.rollback(instantTime);
            throw new HoodieException("Commit " + instantTime + " failed and rolled-back !");
        }
        HashMap<String, String> checkpointCommitMetadata = new HashMap<String, String>();
        if (checkpointStr != null) {
            checkpointCommitMetadata.put("deltastreamer.checkpoint.key", checkpointStr);
        }
        if (this.cfg.checkpoint != null) {
            checkpointCommitMetadata.put("deltastreamer.checkpoint.reset_key", this.cfg.checkpoint);
        }
        if (hasErrors) {
            LOG.warn((Object)("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total=" + totalErrorRecords + "/" + totalRecords));
        }
        String commitActionType = CommitUtils.getCommitActionType(this.cfg.operation, HoodieTableType.valueOf(this.cfg.tableType));
        boolean success = this.writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata), commitActionType, Collections.emptyMap());
        if (!success) {
            LOG.info((Object)("Commit " + instantTime + " failed!"));
            throw new HoodieException("Commit " + instantTime + " failed!");
        }
        LOG.info((Object)("Commit " + instantTime + " successful!"));
        this.formatAdapter.getSource().onCommit(checkpointStr);
        if (this.cfg.isAsyncCompactionEnabled()) {
            scheduledCompactionInstant = this.writeClient.scheduleCompaction(Option.empty());
        }
        if (!isEmpty || this.cfg.forceEmptyMetaSync.booleanValue()) {
            this.runMetaSync();
        }
        long overallTimeMs = overallTimerContext != null ? overallTimerContext.stop() : 0L;
        metrics.updateDeltaStreamerMetrics(overallTimeMs);
        return Pair.of(scheduledCompactionInstant, writeStatusRDD);
    }

    private String startCommit() {
        int maxRetries = 2;
        int retryNum = 1;
        IllegalArgumentException lastException = null;
        while (retryNum <= 2) {
            try {
                String instantTime = HoodieActiveTimeline.createNewInstantTime();
                String commitActionType = CommitUtils.getCommitActionType(this.cfg.operation, HoodieTableType.valueOf(this.cfg.tableType));
                this.writeClient.startCommitWithTime(instantTime, commitActionType);
                return instantTime;
            }
            catch (IllegalArgumentException ie) {
                lastException = ie;
                LOG.error((Object)"Got error trying to start a new commit. Retrying after sleeping for a sec", (Throwable)ie);
                ++retryNum;
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        throw lastException;
    }

    private String getSyncClassShortName(String syncClassName) {
        return syncClassName.substring(syncClassName.lastIndexOf(".") + 1);
    }

    public void runMetaSync() {
        HashSet<String> syncClientToolClasses = new HashSet<String>(Arrays.asList(this.cfg.syncClientToolClassNames.split(",")));
        if (this.cfg.enableHiveSync.booleanValue()) {
            this.cfg.enableMetaSync = true;
            syncClientToolClasses.add(HiveSyncTool.class.getName());
            LOG.info((Object)"When set --enable-hive-sync will use HiveSyncTool for backward compatibility");
        }
        if (this.cfg.enableMetaSync.booleanValue()) {
            FileSystem fs = FSUtils.getFs(this.cfg.targetBasePath, this.jssc.hadoopConfiguration());
            TypedProperties metaProps = new TypedProperties();
            metaProps.putAll((Map<?, ?>)this.props);
            metaProps.putAll((Map<?, ?>)this.writeClient.getConfig().getProps());
            if (this.props.getBoolean(HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC.key(), HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC.defaultValue())) {
                metaProps.put(HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC.key(), HiveSyncConfig.getBucketSpec(this.props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()), this.props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())));
            }
            for (String impl : syncClientToolClasses) {
                Timer.Context syncContext = this.metrics.getMetaSyncTimerContext();
                SyncUtilHelpers.runHoodieMetaSync(impl.trim(), metaProps, this.conf, fs, this.cfg.targetBasePath, this.cfg.baseFileFormat);
                long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0L;
                this.metrics.updateDeltaStreamerMetaSyncMetrics(this.getSyncClassShortName(impl), metaSyncTimeMs);
            }
        }
    }

    public void setupWriteClient() throws IOException {
        if (null != this.schemaProvider) {
            Schema sourceSchema = this.schemaProvider.getSourceSchema();
            Schema targetSchema = this.schemaProvider.getTargetSchema();
            this.reInitWriteClient(sourceSchema, targetSchema);
        }
    }

    private void reInitWriteClient(Schema sourceSchema, Schema targetSchema) throws IOException {
        LOG.info((Object)"Setting up new Hoodie Write Client");
        if (this.isDropPartitionColumns().booleanValue()) {
            targetSchema = HoodieAvroUtils.removeFields(targetSchema, this.getPartitionColumns(this.keyGenerator, this.props));
        }
        this.registerAvroSchemas(sourceSchema, targetSchema);
        HoodieWriteConfig hoodieCfg = this.getHoodieClientConfig(targetSchema);
        if (hoodieCfg.isEmbeddedTimelineServerEnabled()) {
            if (!this.embeddedTimelineService.isPresent()) {
                this.embeddedTimelineService = EmbeddedTimelineServerHelper.createEmbeddedTimelineService((HoodieEngineContext)new HoodieSparkEngineContext(this.jssc), hoodieCfg);
            } else {
                EmbeddedTimelineServerHelper.updateWriteConfigWithTimelineServer(this.embeddedTimelineService.get(), hoodieCfg);
            }
        }
        if (null != this.writeClient) {
            this.writeClient.close();
        }
        this.writeClient = new SparkRDDWriteClient((HoodieEngineContext)new HoodieSparkEngineContext(this.jssc), hoodieCfg, this.embeddedTimelineService);
        this.onInitializingHoodieWriteClient.apply(this.writeClient);
    }

    private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
        return this.getHoodieClientConfig(schemaProvider != null ? schemaProvider.getTargetSchema() : null);
    }

    private HoodieWriteConfig getHoodieClientConfig(Schema schema) {
        HoodieWriteConfig config;
        boolean combineBeforeUpsert = true;
        boolean autoCommit = false;
        HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder().withPath(this.cfg.targetBasePath).combineInput(this.cfg.filterDupes, true).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(this.cfg.isInlineCompactionEnabled()).build()).withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadClass(this.cfg.payloadClassName).withPayloadOrderingField(this.cfg.sourceOrderingField).build()).forTable(this.cfg.targetTableName).withAutoCommit(false).withProps(this.props);
        if (schema != null) {
            builder.withSchema(this.getSchemaForWriteConfig(schema).toString());
        }
        if ((config = builder.build()).writeCommitCallbackOn()) {
            if (HoodieWriteCommitKafkaCallback.class.getName().equals(config.getCallbackClass())) {
                HoodieWriteCommitKafkaCallbackConfig.setCallbackKafkaConfigIfNeeded(config);
            }
            if (HoodieWriteCommitPulsarCallback.class.getName().equals(config.getCallbackClass())) {
                HoodieWriteCommitPulsarCallbackConfig.setCallbackPulsarConfigIfNeeded(config);
            }
        }
        HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.from(this.props);
        ValidationUtils.checkArgument(config.inlineCompactionEnabled() == this.cfg.isInlineCompactionEnabled(), String.format("%s should be set to %s", HoodieCompactionConfig.INLINE_COMPACT.key(), this.cfg.isInlineCompactionEnabled()));
        ValidationUtils.checkArgument(config.inlineClusteringEnabled() == clusteringConfig.isInlineClusteringEnabled(), String.format("%s should be set to %s", HoodieClusteringConfig.INLINE_CLUSTERING.key(), clusteringConfig.isInlineClusteringEnabled()));
        ValidationUtils.checkArgument(config.isAsyncClusteringEnabled() == clusteringConfig.isAsyncClusteringEnabled(), String.format("%s should be set to %s", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), clusteringConfig.isAsyncClusteringEnabled()));
        ValidationUtils.checkArgument(config.shouldAutoCommit() == false, String.format("%s should be set to %s", HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), false));
        ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == this.cfg.filterDupes.booleanValue(), String.format("%s should be set to %s", HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), this.cfg.filterDupes));
        ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert(), String.format("%s should be set to %s", HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key(), true));
        return config;
    }

    private Schema getSchemaForWriteConfig(Schema targetSchema) {
        Schema newWriteSchema = targetSchema;
        try {
            HoodieTableMetaClient meta;
            int totalCompleted;
            if (targetSchema != null && SchemaCompatibility.checkReaderWriterCompatibility((Schema)targetSchema, (Schema)InputBatch.NULL_SCHEMA).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE && SchemaCompatibility.checkReaderWriterCompatibility((Schema)InputBatch.NULL_SCHEMA, (Schema)targetSchema).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE && (totalCompleted = (meta = HoodieTableMetaClient.builder().setConf(new Configuration(this.fs.getConf())).setBasePath(this.cfg.targetBasePath).setPayloadClassName(this.cfg.payloadClassName).build()).getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants()) > 0) {
                try {
                    TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
                    newWriteSchema = schemaResolver.getTableAvroSchema(false);
                }
                catch (IllegalArgumentException e) {
                    LOG.warn((Object)"Could not fetch schema from table. Falling back to using target schema from schema provider");
                }
            }
            return newWriteSchema;
        }
        catch (Exception e) {
            throw new HoodieException("Failed to fetch schema from table ", e);
        }
    }

    private void registerAvroSchemas(SchemaProvider schemaProvider) {
        if (null != schemaProvider) {
            this.registerAvroSchemas(schemaProvider.getSourceSchema(), schemaProvider.getTargetSchema());
        }
    }

    private void registerAvroSchemas(Schema sourceSchema, Schema targetSchema) {
        if (null != sourceSchema) {
            ArrayList<Schema> schemas = new ArrayList<Schema>();
            schemas.add(sourceSchema);
            if (targetSchema != null) {
                schemas.add(targetSchema);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Registering Schema: " + schemas));
            }
            this.jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
        }
    }

    public void close() {
        if (null != this.writeClient) {
            this.writeClient.close();
            this.writeClient = null;
        }
        LOG.info((Object)"Shutting down embedded timeline server");
        if (this.embeddedTimelineService.isPresent()) {
            this.embeddedTimelineService.get().stop();
        }
    }

    public FileSystem getFs() {
        return this.fs;
    }

    public TypedProperties getProps() {
        return this.props;
    }

    public HoodieDeltaStreamer.Config getCfg() {
        return this.cfg;
    }

    public Option<HoodieTimeline> getCommitTimelineOpt() {
        return this.commitTimelineOpt;
    }

    public Option<String> getClusteringInstantOpt() {
        if (this.writeClient != null) {
            return this.writeClient.scheduleClustering(Option.empty());
        }
        return Option.empty();
    }

    private Boolean isDropPartitionColumns() {
        return this.props.getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS.key(), HoodieTableConfig.DROP_PARTITION_COLUMNS.defaultValue());
    }

    private Set<String> getPartitionColumns(KeyGenerator keyGenerator, TypedProperties props) {
        String partitionColumns = SparkKeyGenUtils.getPartitionColumns((KeyGenerator)keyGenerator, (TypedProperties)props);
        return Arrays.stream(partitionColumns.split(",")).collect(Collectors.toSet());
    }
}

