/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.streamer;

import com.codahale.metrics.Timer;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieConversionUtils;
import org.apache.hudi.HoodieSparkSqlWriter;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetaSyncException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncConfigHolder;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.util.SparkKeyGenUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig;
import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallback;
import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
import org.apache.hudi.utilities.exception.HoodieStreamerException;
import org.apache.hudi.utilities.exception.HoodieStreamerWriteException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaSet;
import org.apache.hudi.utilities.schema.SimpleSchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
import org.apache.hudi.utilities.streamer.ErrorEvent;
import org.apache.hudi.utilities.streamer.ErrorTableUtils;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.streamer.SparkSampleWritesUtils;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.avro.HoodieAvroDeserializer;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.JavaConversions;

public class StreamSync
implements Serializable,
Closeable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(StreamSync.class);
    private final HoodieStreamer.Config cfg;
    private transient SourceFormatAdapter formatAdapter;
    private transient SchemaProvider userProvidedSchemaProvider;
    private transient SchemaProvider schemaProvider;
    private transient Option<Transformer> transformer;
    private String keyGenClassName;
    private transient FileSystem fs;
    private final transient HoodieSparkEngineContext hoodieSparkContext;
    private transient SparkSession sparkSession;
    private transient Configuration conf;
    private final TypedProperties props;
    private transient java.util.function.Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient;
    private transient Option<HoodieTimeline> commitsTimelineOpt;
    private transient Option<HoodieTimeline> allCommitsTimelineOpt;
    private final SchemaSet processedSchema;
    private transient Option<EmbeddedTimelineService> embeddedTimelineService = Option.empty();
    private transient SparkRDDWriteClient writeClient;
    private Option<BaseErrorTableWriter> errorTableWriter = Option.empty();
    private HoodieErrorTableConfig.ErrorWriteFailureStrategy errorWriteFailureStrategy;
    private transient HoodieIngestionMetrics metrics;
    private transient HoodieMetrics hoodieMetrics;
    private final boolean autoGenerateRecordKeys;

    @Deprecated
    public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf, java.util.function.Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
        this(cfg, sparkSession, schemaProvider, props, new HoodieSparkEngineContext(jssc), fs, conf, onInitializingHoodieWriteClient);
    }

    public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf, java.util.function.Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
        this.cfg = cfg;
        this.hoodieSparkContext = hoodieSparkContext;
        this.sparkSession = sparkSession;
        this.fs = fs;
        this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
        this.props = props;
        this.userProvidedSchemaProvider = schemaProvider;
        this.processedSchema = new SchemaSet();
        this.autoGenerateRecordKeys = KeyGenUtils.enableAutoGenerateRecordKeys(props);
        this.keyGenClassName = HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName((TypedProperties)new TypedProperties(props));
        this.refreshTimeline();
        this.registerAvroSchemas(schemaProvider);
        this.metrics = (HoodieIngestionMetrics)ReflectionUtils.loadClass(cfg.ingestionMetricsClass, this.getHoodieClientConfig(this.schemaProvider));
        this.hoodieMetrics = new HoodieMetrics(this.getHoodieClientConfig(this.schemaProvider));
        this.conf = conf;
        if (props.getBoolean(HoodieErrorTableConfig.ERROR_TABLE_ENABLED.key(), HoodieErrorTableConfig.ERROR_TABLE_ENABLED.defaultValue())) {
            this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(cfg, sparkSession, props, hoodieSparkContext, fs);
            this.errorWriteFailureStrategy = ErrorTableUtils.getErrorWriteFailureStrategy(props);
        }
        this.formatAdapter = new SourceFormatAdapter(UtilHelpers.createSource(cfg.sourceClassName, props, hoodieSparkContext.jsc(), sparkSession, schemaProvider, this.metrics), this.errorTableWriter, Option.of(props));
        this.transformer = UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames), Option.ofNullable(schemaProvider).map(SchemaProvider::getSourceSchema), this.errorTableWriter.isPresent());
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void refreshTimeline() throws IOException {
        if (this.fs.exists(new Path(this.cfg.targetBasePath))) {
            try {
                HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(this.fs.getConf())).setBasePath(this.cfg.targetBasePath).setPayloadClassName(this.cfg.payloadClassName).setRecordMergerStrategy(this.props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(), HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue())).build();
                switch (meta.getTableType()) {
                    case COPY_ON_WRITE: 
                    case MERGE_ON_READ: {
                        this.commitsTimelineOpt = Option.of(meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
                        this.allCommitsTimelineOpt = Option.of(meta.getActiveTimeline().getAllCommitsTimeline());
                        return;
                    }
                    default: {
                        throw new HoodieException("Unsupported table type :" + (Object)((Object)meta.getTableType()));
                    }
                }
            }
            catch (HoodieIOException e) {
                LOG.warn("Full exception msg " + e.getMessage());
                if (!e.getMessage().contains("Could not load Hoodie properties")) throw e;
                if (!e.getMessage().contains("hoodie.properties")) throw e;
                String basePathWithForwardSlash = this.cfg.targetBasePath.endsWith("/") ? this.cfg.targetBasePath : String.format("%s/", this.cfg.targetBasePath);
                String pathToHoodieProps = String.format("%s%s/%s", basePathWithForwardSlash, ".hoodie", "hoodie.properties");
                String pathToHoodiePropsBackup = String.format("%s%s/%s", basePathWithForwardSlash, ".hoodie", "hoodie.properties.backup");
                if (this.fs.exists(new Path(basePathWithForwardSlash)) && this.fs.exists(new Path(pathToHoodieProps)) && this.fs.exists(new Path(pathToHoodiePropsBackup))) {
                    return;
                }
                boolean bl = false;
                boolean hoodiePropertiesExists = bl;
                if (hoodiePropertiesExists) return;
                LOG.warn("Base path exists, but table is not fully initialized. Re-initializing again");
                this.initializeEmptyTable();
                HoodieTableMetaClient metaClientToValidate = HoodieTableMetaClient.builder().setConf(new Configuration(this.fs.getConf())).setBasePath(this.cfg.targetBasePath).build();
                if (metaClientToValidate.reloadActiveTimeline().countInstants() <= 0) return;
                this.fs.delete(new Path(String.format("%s%s/%s", basePathWithForwardSlash, ".hoodie", "hoodie.properties")));
                throw new HoodieIOException("hoodie.properties is missing. Likely due to some external entity. Please populate the hoodie.properties and restart the pipeline. ", e.getIOException());
            }
        } else {
            this.initializeEmptyTable();
        }
    }

    private void initializeEmptyTable() throws IOException {
        this.commitsTimelineOpt = Option.empty();
        this.allCommitsTimelineOpt = Option.empty();
        String partitionColumns = SparkKeyGenUtils.getPartitionColumns((TypedProperties)this.props);
        HoodieTableMetaClient.withPropertyBuilder().setTableType(this.cfg.tableType).setTableName(this.cfg.targetTableName).setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue()).setPayloadClassName(this.cfg.payloadClassName).setBaseFileFormat(this.cfg.baseFileFormat).setPartitionFields(partitionColumns).setRecordKeyFields(this.props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key())).setPopulateMetaFields(this.props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())).setKeyGeneratorClassProp(this.keyGenClassName).setPreCombineField(this.cfg.sourceOrderingField).setPartitionMetafileUseBaseFormat(this.props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())).setCDCEnabled(this.props.getBoolean(HoodieTableConfig.CDC_ENABLED.key(), HoodieTableConfig.CDC_ENABLED.defaultValue())).setCDCSupplementalLoggingMode(this.props.getString(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key(), HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.defaultValue())).setShouldDropPartitionColumns(this.isDropPartitionColumns()).setHiveStylePartitioningEnable(this.props.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key(), Boolean.parseBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.defaultValue()))).setUrlEncodePartitioning(this.props.getBoolean(HoodieTableConfig.URL_ENCODE_PARTITIONING.key(), Boolean.parseBoolean(HoodieTableConfig.URL_ENCODE_PARTITIONING.defaultValue()))).initTable(new Configuration(this.hoodieSparkContext.hadoopConfiguration()), this.cfg.targetBasePath);
    }

    public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException {
        Pair<Option<String>, JavaRDD<WriteStatus>> result = null;
        Timer.Context overallTimerContext = this.metrics.getOverallTimerContext();
        this.refreshTimeline();
        String instantTime = HoodieActiveTimeline.createNewInstantTime();
        Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> srcRecordsWithCkpt = this.readFromSource(instantTime);
        if (srcRecordsWithCkpt != null) {
            Option<String> pendingClusteringInstant;
            JavaRDD<HoodieRecord> recordsFromSource = srcRecordsWithCkpt.getRight().getRight();
            if (this.writeClient == null) {
                this.schemaProvider = srcRecordsWithCkpt.getKey();
                this.setupWriteClient(recordsFromSource);
            } else {
                Schema newSourceSchema = srcRecordsWithCkpt.getKey().getSourceSchema();
                Schema newTargetSchema = srcRecordsWithCkpt.getKey().getTargetSchema();
                if (!this.processedSchema.isSchemaPresent(newSourceSchema) || !this.processedSchema.isSchemaPresent(newTargetSchema)) {
                    LOG.info("Seeing new schema. Source :" + newSourceSchema.toString(true) + ", Target :" + newTargetSchema.toString(true));
                    this.reInitWriteClient(newSourceSchema, newTargetSchema, recordsFromSource);
                    this.processedSchema.addSchema(newSourceSchema);
                    this.processedSchema.addSchema(newTargetSchema);
                }
            }
            if (this.cfg.retryLastPendingInlineCompactionJob.booleanValue() && this.getHoodieClientConfig(this.schemaProvider).inlineCompactionEnabled()) {
                Option<String> pendingCompactionInstant = this.getLastPendingCompactionInstant(this.allCommitsTimelineOpt);
                if (pendingCompactionInstant.isPresent()) {
                    HoodieWriteMetadata writeMetadata = this.writeClient.compact(pendingCompactionInstant.get());
                    this.writeClient.commitCompaction(pendingCompactionInstant.get(), writeMetadata.getCommitMetadata().get(), Option.empty());
                    this.refreshTimeline();
                    this.reInitWriteClient(this.schemaProvider.getSourceSchema(), this.schemaProvider.getTargetSchema(), null);
                }
            } else if (this.cfg.retryLastPendingInlineClusteringJob.booleanValue() && this.getHoodieClientConfig(this.schemaProvider).inlineClusteringEnabled() && (pendingClusteringInstant = this.getLastPendingClusteringInstant(this.allCommitsTimelineOpt)).isPresent()) {
                this.writeClient.cluster(pendingClusteringInstant.get());
            }
            result = this.writeToSink(instantTime, recordsFromSource, srcRecordsWithCkpt.getRight().getLeft(), this.metrics, overallTimerContext);
        }
        this.metrics.updateStreamerSyncMetrics(System.currentTimeMillis());
        return result;
    }

    private Option<String> getLastPendingClusteringInstant(Option<HoodieTimeline> commitTimelineOpt) {
        if (commitTimelineOpt.isPresent()) {
            Option<HoodieInstant> pendingClusteringInstant = commitTimelineOpt.get().filterPendingReplaceTimeline().lastInstant();
            return pendingClusteringInstant.isPresent() ? Option.of(pendingClusteringInstant.get().getTimestamp()) : Option.empty();
        }
        return Option.empty();
    }

    private Option<String> getLastPendingCompactionInstant(Option<HoodieTimeline> commitTimelineOpt) {
        if (commitTimelineOpt.isPresent()) {
            Option<HoodieInstant> pendingCompactionInstant = commitTimelineOpt.get().filterPendingCompactionTimeline().lastInstant();
            return pendingCompactionInstant.isPresent() ? Option.of(pendingCompactionInstant.get().getTimestamp()) : Option.empty();
        }
        return Option.empty();
    }

    public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(String instantTime) throws IOException {
        Option<Object> resumeCheckpointStr = Option.empty();
        if (this.commitsTimelineOpt.isPresent()) {
            resumeCheckpointStr = this.getCheckpointToResume(this.commitsTimelineOpt);
        }
        LOG.debug("Checkpoint from config: " + this.cfg.checkpoint);
        if (!resumeCheckpointStr.isPresent() && this.cfg.checkpoint != null) {
            resumeCheckpointStr = Option.of(this.cfg.checkpoint);
        }
        LOG.info("Checkpoint to resume from : " + resumeCheckpointStr);
        int maxRetryCount = this.cfg.retryOnSourceFailures != false ? this.cfg.maxRetryCount : 1;
        int curRetryCount = 0;
        Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> sourceDataToSync = null;
        while (curRetryCount++ < maxRetryCount && sourceDataToSync == null) {
            try {
                sourceDataToSync = this.fetchFromSource(resumeCheckpointStr, instantTime);
            }
            catch (HoodieSourceTimeoutException e) {
                if (curRetryCount >= maxRetryCount) {
                    throw e;
                }
                try {
                    LOG.error("Exception thrown while fetching data from source. Msg : " + e.getMessage() + ", class : " + e.getClass() + ", cause : " + e.getCause());
                    LOG.error("Sleeping for " + this.cfg.retryIntervalSecs + " before retrying again. Current retry count " + curRetryCount + ", max retry count " + this.cfg.maxRetryCount);
                    Thread.sleep(this.cfg.retryIntervalSecs * 1000);
                }
                catch (InterruptedException ex) {
                    LOG.error("Ignoring InterruptedException while waiting to retry on source failure " + e.getMessage());
                }
            }
        }
        return sourceDataToSync;
    }

    private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSource(Option<String> resumeCheckpointStr, String instantTime) {
        JavaRDD records;
        SchemaProvider schemaProvider;
        Option<Dataset<Row>> avroRDDOptional;
        String checkpointStr;
        InputBatch<Dataset<Row>> dataAndCheckpoint;
        HoodieRecord.HoodieRecordType recordType = UtilHelpers.createRecordMerger(this.props).getRecordType();
        if (recordType == HoodieRecord.HoodieRecordType.SPARK && HoodieTableType.valueOf(this.cfg.tableType) == HoodieTableType.MERGE_ON_READ && HoodieLogBlock.HoodieLogBlockType.fromId(this.props.getProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "avro")) != HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) {
            throw new UnsupportedOperationException("Spark record only support parquet log.");
        }
        if (this.transformer.isPresent()) {
            dataAndCheckpoint = this.formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, this.cfg.sourceLimit);
            Option<Dataset<Row>> transformed = dataAndCheckpoint.getBatch().map(data -> this.transformer.get().apply(this.hoodieSparkContext.jsc(), this.sparkSession, (Dataset<Row>)data, this.props));
            transformed = this.formatAdapter.processErrorEvents(transformed, ErrorEvent.ErrorReason.CUSTOM_TRANSFORMER_FAILURE);
            checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
            boolean reconcileSchema = this.props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
            if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) {
                avroRDDOptional = this.errorTableWriter.isPresent() && this.props.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(), HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue()) ? transformed.map(rowDataset -> {
                    Tuple2 safeCreateRDDs = HoodieSparkUtils.safeCreateRDD((Dataset)rowDataset, (String)"hoodie_source", (String)"hoodie.source", (boolean)reconcileSchema, Option.of(this.userProvidedSchemaProvider.getTargetSchema()));
                    this.errorTableWriter.get().addErrorEvents(((RDD)safeCreateRDDs._2()).toJavaRDD().map((Function & Serializable)evStr -> new ErrorEvent<String>((String)evStr, ErrorEvent.ErrorReason.AVRO_DESERIALIZATION_FAILURE)));
                    return ((RDD)safeCreateRDDs._1).toJavaRDD();
                }) : transformed.map(rowDataset -> this.getTransformedRDD((Dataset<Row>)rowDataset, reconcileSchema, this.userProvidedSchemaProvider.getTargetSchema()));
                schemaProvider = this.userProvidedSchemaProvider;
            } else {
                Option<Schema> latestTableSchemaOpt = UtilHelpers.getLatestTableSchema(this.hoodieSparkContext.jsc(), this.fs, this.cfg.targetBasePath);
                Option<Schema> targetSchemaOpt = transformed.map(df -> {
                    Schema sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema((DataType)df.schema(), (String)latestTableSchemaOpt.map(Schema::getFullName).orElse(AvroSchemaUtils.getAvroRecordQualifiedName(this.cfg.targetTableName)));
                    return HoodieSparkSqlWriter.deduceWriterSchema((Schema)sourceSchema, (scala.Option)HoodieConversionUtils.toScalaOption((Option)latestTableSchemaOpt), (scala.Option)HoodieConversionUtils.toScalaOption(Option.empty()), (scala.collection.immutable.Map)HoodieConversionUtils.fromProperties((TypedProperties)this.props));
                });
                schemaProvider = targetSchemaOpt.map(targetSchema -> new DelegatingSchemaProvider(this.props, this.hoodieSparkContext.jsc(), dataAndCheckpoint.getSchemaProvider(), new SimpleSchemaProvider(this.hoodieSparkContext.jsc(), (Schema)targetSchema, this.props))).orElse(dataAndCheckpoint.getSchemaProvider());
                avroRDDOptional = transformed.map(t -> this.getTransformedRDD((Dataset<Row>)t, reconcileSchema, schemaProvider.getTargetSchema()));
            }
        } else {
            dataAndCheckpoint = this.formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, this.cfg.sourceLimit);
            avroRDDOptional = dataAndCheckpoint.getBatch();
            checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
            schemaProvider = dataAndCheckpoint.getSchemaProvider();
        }
        if (!this.cfg.allowCommitOnNoCheckpointChange.booleanValue() && Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
            LOG.info("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(" + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
            String commitActionType = CommitUtils.getCommitActionType(this.cfg.operation, HoodieTableType.valueOf(this.cfg.tableType));
            this.hoodieMetrics.updateMetricsForEmptyData(commitActionType);
            return null;
        }
        this.hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking if input is empty");
        if (!avroRDDOptional.isPresent() || ((JavaRDD)avroRDDOptional.get()).isEmpty()) {
            LOG.info("No new data, perform empty commit.");
            return Pair.of(schemaProvider, Pair.of(checkpointStr, this.hoodieSparkContext.emptyRDD()));
        }
        boolean shouldCombine = this.cfg.filterDupes != false || this.cfg.operation.equals((Object)WriteOperationType.UPSERT);
        Set<String> partitionColumns = this.getPartitionColumns(this.props);
        JavaRDD avroRDD = (JavaRDD)avroRDDOptional.get();
        SerializableSchema avroSchema = new SerializableSchema(schemaProvider.getTargetSchema());
        SerializableSchema processedAvroSchema = new SerializableSchema(this.isDropPartitionColumns() != false ? HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
        if (recordType == HoodieRecord.HoodieRecordType.AVRO) {
            records = avroRDD.mapPartitions((FlatMapFunction & Serializable)genericRecordIterator -> {
                if (this.autoGenerateRecordKeys) {
                    this.props.setProperty("_hoodie.record.key.gen.partition.id", String.valueOf(TaskContext.getPartitionId()));
                    this.props.setProperty("_hoodie.record.key.gen.instant.time", instantTime);
                }
                BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator)HoodieSparkKeyGeneratorFactory.createKeyGenerator((TypedProperties)this.props);
                ArrayList<HoodieAvroRecord<HoodieRecordPayload>> avroRecords = new ArrayList<HoodieAvroRecord<HoodieRecordPayload>>();
                while (genericRecordIterator.hasNext()) {
                    GenericRecord genRec = (GenericRecord)genericRecordIterator.next();
                    HoodieKey hoodieKey = new HoodieKey(builtinKeyGenerator.getRecordKey(genRec), builtinKeyGenerator.getPartitionPath(genRec));
                    GenericRecord gr = this.isDropPartitionColumns() != false ? HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
                    HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload((String)this.cfg.payloadClassName, (GenericRecord)gr, (Comparable)((Comparable)HoodieAvroUtils.getNestedFieldVal(gr, this.cfg.sourceOrderingField, false, this.props.getBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))) : DataSourceUtils.createPayload((String)this.cfg.payloadClassName, (GenericRecord)gr);
                    avroRecords.add(new HoodieAvroRecord<HoodieRecordPayload>(hoodieKey, payload));
                }
                return avroRecords.iterator();
            });
        } else if (recordType == HoodieRecord.HoodieRecordType.SPARK) {
            records = avroRDD.mapPartitions((FlatMapFunction & Serializable)itr -> {
                if (this.autoGenerateRecordKeys) {
                    this.props.setProperty("_hoodie.record.key.gen.partition.id", String.valueOf(TaskContext.getPartitionId()));
                    this.props.setProperty("_hoodie.record.key.gen.instant.time", instantTime);
                }
                BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator)HoodieSparkKeyGeneratorFactory.createKeyGenerator((TypedProperties)this.props);
                StructType baseStructType = AvroConversionUtils.convertAvroSchemaToStructType((Schema)processedAvroSchema.get());
                StructType targetStructType = this.isDropPartitionColumns() != false ? AvroConversionUtils.convertAvroSchemaToStructType((Schema)HoodieAvroUtils.removeFields(processedAvroSchema.get(), partitionColumns)) : baseStructType;
                HoodieAvroDeserializer deserializer = SparkAdapterSupport$.MODULE$.sparkAdapter().createAvroDeserializer(processedAvroSchema.get(), (DataType)baseStructType);
                return new CloseableMappingIterator<GenericRecord, HoodieRecord>(ClosableIterator.wrap(itr), rec -> {
                    InternalRow row = (InternalRow)deserializer.deserialize(rec).get();
                    String recordKey = builtinKeyGenerator.getRecordKey(row, baseStructType).toString();
                    String partitionPath = builtinKeyGenerator.getPartitionPath(row, baseStructType).toString();
                    return new HoodieSparkRecord(new HoodieKey(recordKey, partitionPath), (InternalRow)HoodieInternalRowUtils.getCachedUnsafeProjection((StructType)baseStructType, (StructType)targetStructType).apply(row), targetStructType, false);
                });
            });
        } else {
            throw new UnsupportedOperationException(recordType.name());
        }
        return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
    }

    private JavaRDD<GenericRecord> getTransformedRDD(Dataset<Row> rowDataset, boolean reconcileSchema, Schema readerSchema) {
        return HoodieSparkUtils.createRdd(rowDataset, (String)"hoodie_source", (String)"hoodie.source", (boolean)reconcileSchema, Option.ofNullable(readerSchema)).toJavaRDD();
    }

    private Option<String> getCheckpointToResume(Option<HoodieTimeline> commitsTimelineOpt) throws IOException {
        Option<HoodieInstant> lastCommit;
        Option<String> resumeCheckpointStr = Option.empty();
        HoodieTimeline deltaCommitTimeline = commitsTimelineOpt.get().filter(instant -> instant.getAction().equals("deltacommit"));
        if (!deltaCommitTimeline.empty()) {
            commitsTimelineOpt = Option.of(deltaCommitTimeline);
        }
        if ((lastCommit = commitsTimelineOpt.get().lastInstant()).isPresent()) {
            Option<HoodieCommitMetadata> commitMetadataOption = this.getLatestCommitMetadataWithValidCheckpointInfo(commitsTimelineOpt.get());
            if (commitMetadataOption.isPresent()) {
                HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
                LOG.debug("Checkpoint reset from metadata: " + commitMetadata.getMetadata("deltastreamer.checkpoint.reset_key"));
                if (this.cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.reset_key")) || !this.cfg.checkpoint.equals(commitMetadata.getMetadata("deltastreamer.checkpoint.reset_key")))) {
                    resumeCheckpointStr = Option.of(this.cfg.checkpoint);
                } else if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.key"))) {
                    String value = commitMetadata.getMetadata("deltastreamer.checkpoint.key");
                    resumeCheckpointStr = Option.of(value);
                } else if (HoodieTimeline.compareTimestamps("00000000000002", HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
                    throw new HoodieStreamerException("Unable to find previous checkpoint. Please double check if this table was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :" + commitsTimelineOpt.get().getInstants() + ", CommitMetadata=" + commitMetadata.toJsonString());
                }
                if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.reset_key"))) {
                    ConfigUtils.removeConfigFromProps(this.props, KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE);
                }
            } else if (this.cfg.checkpoint != null) {
                resumeCheckpointStr = Option.of(this.cfg.checkpoint);
            }
        }
        return resumeCheckpointStr;
    }

    protected Option<Pair<String, HoodieCommitMetadata>> getLatestInstantAndCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws IOException {
        return timeline.getReverseOrderedInstants().map(instant -> {
            try {
                HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails((HoodieInstant)instant).get(), HoodieCommitMetadata.class);
                if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.key")) || !StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.reset_key"))) {
                    return Option.of(Pair.of(instant.toString(), commitMetadata));
                }
                return Option.empty();
            }
            catch (IOException e) {
                throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + instant.toString(), e);
            }
        }).filter(Option::isPresent).findFirst().orElse(Option.empty());
    }

    protected Option<HoodieCommitMetadata> getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws IOException {
        return this.getLatestInstantAndCommitMetadataWithValidCheckpointInfo(timeline).map(pair -> (HoodieCommitMetadata)pair.getRight());
    }

    protected Option<String> getLatestInstantWithValidCheckpointInfo(Option<HoodieTimeline> timelineOpt) {
        return timelineOpt.map(timeline -> {
            try {
                return this.getLatestInstantAndCommitMetadataWithValidCheckpointInfo((HoodieTimeline)timeline).map(pair -> (String)pair.getLeft());
            }
            catch (IOException e) {
                throw new HoodieIOException("failed to get latest instant with ValidCheckpointInfo", e);
            }
        }).orElse(Option.empty());
    }

    /*
     * Enabled aggressive block sorting
     */
    private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(String instantTime, JavaRDD<HoodieRecord> records, String checkpointStr, HoodieIngestionMetrics metrics, Timer.Context overallTimerContext) {
        boolean success;
        boolean hasErrors;
        JavaRDD writeStatusRDD;
        Option scheduledCompactionInstant = Option.empty();
        if (this.cfg.filterDupes.booleanValue()) {
            records = DataSourceUtils.dropDuplicates((JavaSparkContext)this.hoodieSparkContext.jsc(), (JavaRDD)records, (HoodieWriteConfig)this.writeClient.getConfig());
        }
        boolean isEmpty = records.isEmpty();
        instantTime = this.startCommit(instantTime, !this.autoGenerateRecordKeys);
        LOG.info("Starting commit  : " + instantTime);
        Map partitionToReplacedFileIds = Collections.emptyMap();
        switch (this.cfg.operation) {
            case INSERT: {
                writeStatusRDD = this.writeClient.insert(records, instantTime);
                break;
            }
            case UPSERT: {
                writeStatusRDD = this.writeClient.upsert(records, instantTime);
                break;
            }
            case BULK_INSERT: {
                writeStatusRDD = this.writeClient.bulkInsert(records, instantTime);
                break;
            }
            case INSERT_OVERWRITE: {
                HoodieWriteResult writeResult = this.writeClient.insertOverwrite(records, instantTime);
                partitionToReplacedFileIds = writeResult.getPartitionToReplaceFileIds();
                writeStatusRDD = writeResult.getWriteStatuses();
                break;
            }
            case INSERT_OVERWRITE_TABLE: {
                HoodieWriteResult writeResult = this.writeClient.insertOverwriteTable(records, instantTime);
                partitionToReplacedFileIds = writeResult.getPartitionToReplaceFileIds();
                writeStatusRDD = writeResult.getWriteStatuses();
                break;
            }
            case DELETE_PARTITION: {
                List partitions = records.map((Function & Serializable)record -> record.getPartitionPath()).distinct().collect();
                HoodieWriteResult writeResult = this.writeClient.deletePartitions(partitions, instantTime);
                partitionToReplacedFileIds = writeResult.getPartitionToReplaceFileIds();
                writeStatusRDD = writeResult.getWriteStatuses();
                break;
            }
            default: {
                throw new HoodieStreamerException("Unknown operation : " + (Object)((Object)this.cfg.operation));
            }
        }
        long totalErrorRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
        long totalRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
        boolean bl = hasErrors = totalErrorRecords > 0L;
        if (hasErrors && !this.cfg.commitOnErrors.booleanValue()) {
            LOG.error("Delta Sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
            LOG.error("Printing out the top 100 errors");
            writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
                LOG.error("Global error :", ws.getGlobalError());
                if (ws.getErrors().size() > 0) {
                    ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value));
                }
            });
            this.writeClient.rollback(instantTime);
            throw new HoodieStreamerWriteException("Commit " + instantTime + " failed and rolled-back !");
        }
        HashMap<String, String> checkpointCommitMetadata = new HashMap<String, String>();
        if (!ConfigUtils.getBooleanWithAltKeys(this.props, HoodieStreamerConfig.CHECKPOINT_FORCE_SKIP)) {
            if (checkpointStr != null) {
                checkpointCommitMetadata.put("deltastreamer.checkpoint.key", checkpointStr);
            }
            if (this.cfg.checkpoint != null) {
                checkpointCommitMetadata.put("deltastreamer.checkpoint.reset_key", this.cfg.checkpoint);
            }
        }
        if (hasErrors) {
            LOG.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
        }
        String commitActionType = CommitUtils.getCommitActionType(this.cfg.operation, HoodieTableType.valueOf(this.cfg.tableType));
        if (this.errorTableWriter.isPresent()) {
            Option<String> commitedInstantTime = this.getLatestInstantWithValidCheckpointInfo(this.commitsTimelineOpt);
            boolean errorTableSuccess = this.errorTableWriter.get().upsertAndCommit(instantTime, commitedInstantTime);
            if (!errorTableSuccess) {
                switch (this.errorWriteFailureStrategy) {
                    case ROLLBACK_COMMIT: {
                        LOG.info("Commit " + instantTime + " failed!");
                        this.writeClient.rollback(instantTime);
                        throw new HoodieStreamerWriteException("Error table commit failed");
                    }
                    case LOG_ERROR: {
                        LOG.error("Error Table write failed for instant " + instantTime);
                        break;
                    }
                    default: {
                        throw new HoodieStreamerWriteException("Write failure strategy not implemented for " + (Object)((Object)this.errorWriteFailureStrategy));
                    }
                }
            }
        }
        if (!(success = this.writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata), commitActionType, partitionToReplacedFileIds, Option.empty()))) {
            LOG.info("Commit " + instantTime + " failed!");
            throw new HoodieStreamerWriteException("Commit " + instantTime + " failed!");
        }
        LOG.info("Commit " + instantTime + " successful!");
        this.formatAdapter.getSource().onCommit(checkpointStr);
        if (this.cfg.isAsyncCompactionEnabled()) {
            scheduledCompactionInstant = this.writeClient.scheduleCompaction(Option.empty());
        }
        if (!isEmpty || this.cfg.forceEmptyMetaSync.booleanValue()) {
            this.runMetaSync();
        }
        long overallTimeMs = overallTimerContext != null ? overallTimerContext.stop() : 0L;
        metrics.updateStreamerMetrics(overallTimeMs);
        return Pair.of(scheduledCompactionInstant, writeStatusRDD);
    }

    private String startCommit(String instantTime, boolean retryEnabled) {
        int maxRetries = 2;
        int retryNum = 1;
        IllegalArgumentException lastException = null;
        while (retryNum <= 2) {
            try {
                String commitActionType = CommitUtils.getCommitActionType(this.cfg.operation, HoodieTableType.valueOf(this.cfg.tableType));
                this.writeClient.startCommitWithTime(instantTime, commitActionType);
                return instantTime;
            }
            catch (IllegalArgumentException ie) {
                lastException = ie;
                if (!retryEnabled) {
                    throw ie;
                }
                LOG.error("Got error trying to start a new commit. Retrying after sleeping for a sec", (Throwable)ie);
                ++retryNum;
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                instantTime = HoodieActiveTimeline.createNewInstantTime();
            }
        }
        throw lastException;
    }

    private String getSyncClassShortName(String syncClassName) {
        return syncClassName.substring(syncClassName.lastIndexOf(".") + 1);
    }

    public void runMetaSync() {
        HashSet<String> syncClientToolClasses = new HashSet<String>(Arrays.asList(this.cfg.syncClientToolClassNames.split(",")));
        if (this.cfg.enableHiveSync.booleanValue()) {
            this.cfg.enableMetaSync = true;
            syncClientToolClasses.add(HiveSyncTool.class.getName());
            LOG.info("When set --enable-hive-sync will use HiveSyncTool for backward compatibility");
        }
        if (this.cfg.enableMetaSync.booleanValue()) {
            FileSystem fs = FSUtils.getFs(this.cfg.targetBasePath, this.hoodieSparkContext.hadoopConfiguration());
            TypedProperties metaProps = new TypedProperties();
            metaProps.putAll((Map<?, ?>)this.props);
            metaProps.putAll((Map<?, ?>)this.writeClient.getConfig().getProps());
            if (this.props.getBoolean(HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC.key(), HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC.defaultValue())) {
                metaProps.put(HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC.key(), HiveSyncConfig.getBucketSpec(this.props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()), this.props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())));
            }
            HashMap<String, HoodieException> failedMetaSyncs = new HashMap<String, HoodieException>();
            for (String impl : syncClientToolClasses) {
                Timer.Context syncContext = this.metrics.getMetaSyncTimerContext();
                try {
                    SyncUtilHelpers.runHoodieMetaSync(impl.trim(), metaProps, this.conf, fs, this.cfg.targetBasePath, this.cfg.baseFileFormat);
                }
                catch (HoodieMetaSyncException e) {
                    LOG.warn("SyncTool class " + impl.trim() + " failed with exception", (Throwable)e);
                    failedMetaSyncs.put(impl, e);
                }
                long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0L;
                this.metrics.updateStreamerMetaSyncMetrics(this.getSyncClassShortName(impl), metaSyncTimeMs);
            }
            if (!failedMetaSyncs.isEmpty()) {
                throw SyncUtilHelpers.getHoodieMetaSyncException(failedMetaSyncs);
            }
        }
    }

    private void setupWriteClient(JavaRDD<HoodieRecord> records) throws IOException {
        if (null != this.schemaProvider) {
            Schema sourceSchema = this.schemaProvider.getSourceSchema();
            Schema targetSchema = this.schemaProvider.getTargetSchema();
            this.reInitWriteClient(sourceSchema, targetSchema, records);
        }
    }

    private void reInitWriteClient(Schema sourceSchema, Schema targetSchema, JavaRDD<HoodieRecord> records) throws IOException {
        LOG.info("Setting up new Hoodie Write Client");
        if (this.isDropPartitionColumns().booleanValue()) {
            targetSchema = HoodieAvroUtils.removeFields(targetSchema, this.getPartitionColumns(this.props));
        }
        this.registerAvroSchemas(sourceSchema, targetSchema);
        HoodieWriteConfig initialWriteConfig = this.getHoodieClientConfig(targetSchema);
        HoodieWriteConfig writeConfig = SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(this.hoodieSparkContext.jsc(), records, initialWriteConfig).orElse(initialWriteConfig);
        if (writeConfig.isEmbeddedTimelineServerEnabled()) {
            if (!this.embeddedTimelineService.isPresent()) {
                this.embeddedTimelineService = EmbeddedTimelineServerHelper.createEmbeddedTimelineService((HoodieEngineContext)this.hoodieSparkContext, writeConfig);
            } else {
                EmbeddedTimelineServerHelper.updateWriteConfigWithTimelineServer(this.embeddedTimelineService.get(), writeConfig);
            }
        }
        if (this.writeClient != null) {
            this.writeClient.close();
        }
        this.writeClient = new SparkRDDWriteClient((HoodieEngineContext)this.hoodieSparkContext, writeConfig, this.embeddedTimelineService);
        this.onInitializingHoodieWriteClient.apply(this.writeClient);
    }

    private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
        return this.getHoodieClientConfig(schemaProvider != null ? schemaProvider.getTargetSchema() : null);
    }

    private HoodieWriteConfig getHoodieClientConfig(Schema schema) {
        HoodieWriteConfig config;
        boolean combineBeforeUpsert = true;
        boolean autoCommit = false;
        HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder().withPath(this.cfg.targetBasePath).combineInput(this.cfg.filterDupes, true).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(this.cfg.isInlineCompactionEnabled()).build()).withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadClass(this.cfg.payloadClassName).withPayloadOrderingField(this.cfg.sourceOrderingField).build()).forTable(this.cfg.targetTableName).withAutoCommit(false).withProps(this.props);
        if (schema != null) {
            builder.withSchema(this.getSchemaForWriteConfig(schema).toString());
        }
        if ((config = builder.build()).writeCommitCallbackOn()) {
            if (HoodieWriteCommitKafkaCallback.class.getName().equals(config.getCallbackClass())) {
                HoodieWriteCommitKafkaCallbackConfig.setCallbackKafkaConfigIfNeeded(config);
            }
            if (HoodieWriteCommitPulsarCallback.class.getName().equals(config.getCallbackClass())) {
                HoodieWriteCommitPulsarCallbackConfig.setCallbackPulsarConfigIfNeeded(config);
            }
        }
        HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.from(this.props);
        ValidationUtils.checkArgument(config.inlineCompactionEnabled() == this.cfg.isInlineCompactionEnabled(), String.format("%s should be set to %s", HoodieCompactionConfig.INLINE_COMPACT.key(), this.cfg.isInlineCompactionEnabled()));
        ValidationUtils.checkArgument(config.inlineClusteringEnabled() == clusteringConfig.isInlineClusteringEnabled(), String.format("%s should be set to %s", HoodieClusteringConfig.INLINE_CLUSTERING.key(), clusteringConfig.isInlineClusteringEnabled()));
        ValidationUtils.checkArgument(config.isAsyncClusteringEnabled() == clusteringConfig.isAsyncClusteringEnabled(), String.format("%s should be set to %s", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), clusteringConfig.isAsyncClusteringEnabled()));
        ValidationUtils.checkArgument(config.shouldAutoCommit() == false, String.format("%s should be set to %s", HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), false));
        ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == this.cfg.filterDupes.booleanValue(), String.format("%s should be set to %s", HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), this.cfg.filterDupes));
        ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert(), String.format("%s should be set to %s", HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key(), true));
        return config;
    }

    private Schema getSchemaForWriteConfig(Schema targetSchema) {
        Schema newWriteSchema = targetSchema;
        try {
            HoodieTableMetaClient meta;
            int totalCompleted;
            if (targetSchema != null && SchemaCompatibility.checkReaderWriterCompatibility((Schema)targetSchema, (Schema)InputBatch.NULL_SCHEMA).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE && SchemaCompatibility.checkReaderWriterCompatibility((Schema)InputBatch.NULL_SCHEMA, (Schema)targetSchema).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE && (totalCompleted = (meta = HoodieTableMetaClient.builder().setConf(new Configuration(this.fs.getConf())).setBasePath(this.cfg.targetBasePath).setPayloadClassName(this.cfg.payloadClassName).build()).getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants()) > 0) {
                try {
                    TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
                    newWriteSchema = schemaResolver.getTableAvroSchema(false);
                }
                catch (IllegalArgumentException e) {
                    LOG.warn("Could not fetch schema from table. Falling back to using target schema from schema provider");
                }
            }
            return newWriteSchema;
        }
        catch (Exception e) {
            throw new HoodieSchemaFetchException("Failed to fetch schema from table", e);
        }
    }

    private void registerAvroSchemas(SchemaProvider schemaProvider) {
        if (null != schemaProvider) {
            this.registerAvroSchemas(schemaProvider.getSourceSchema(), schemaProvider.getTargetSchema());
        }
    }

    private void registerAvroSchemas(Schema sourceSchema, Schema targetSchema) {
        if (null != sourceSchema) {
            ArrayList<Schema> schemas = new ArrayList<Schema>();
            schemas.add(sourceSchema);
            if (targetSchema != null) {
                schemas.add(targetSchema);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Registering Schema: " + schemas);
            }
            this.hoodieSparkContext.getJavaSparkContext().sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
        }
    }

    @Override
    public void close() {
        if (this.writeClient != null) {
            this.writeClient.close();
            this.writeClient = null;
        }
        if (this.formatAdapter != null) {
            this.formatAdapter.close();
        }
        LOG.info("Shutting down embedded timeline server");
        if (this.embeddedTimelineService.isPresent()) {
            this.embeddedTimelineService.get().stop();
        }
        if (this.metrics != null) {
            this.metrics.shutdown();
        }
    }

    public FileSystem getFs() {
        return this.fs;
    }

    public TypedProperties getProps() {
        return this.props;
    }

    public HoodieStreamer.Config getCfg() {
        return this.cfg;
    }

    public Option<HoodieTimeline> getCommitsTimelineOpt() {
        return this.commitsTimelineOpt;
    }

    public HoodieIngestionMetrics getMetrics() {
        return this.metrics;
    }

    public Option<String> getClusteringInstantOpt() {
        if (this.writeClient != null) {
            return this.writeClient.scheduleClustering(Option.empty());
        }
        return Option.empty();
    }

    private Boolean isDropPartitionColumns() {
        return this.props.getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS.key(), HoodieTableConfig.DROP_PARTITION_COLUMNS.defaultValue());
    }

    private Set<String> getPartitionColumns(TypedProperties props) {
        String partitionColumns = SparkKeyGenUtils.getPartitionColumns((TypedProperties)props);
        return Arrays.stream(partitionColumns.split(",")).collect(Collectors.toSet());
    }
}

