/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table;

import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.ChangelogModes;

public class HoodieTableSink
implements DynamicTableSink,
SupportsPartitioning,
SupportsOverwrite {
    private final Configuration conf;
    private final ResolvedSchema schema;
    private boolean overwrite = false;

    public HoodieTableSink(Configuration conf, ResolvedSchema schema) {
        this.conf = conf;
        this.schema = schema;
    }

    public HoodieTableSink(Configuration conf, ResolvedSchema schema, boolean overwrite) {
        this.conf = conf;
        this.schema = schema;
        this.overwrite = overwrite;
    }

    public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
        return dataStream -> {
            long ckpTimeout = dataStream.getExecutionEnvironment().getCheckpointConfig().getCheckpointTimeout();
            this.conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
            OptionsInference.setupSinkTasks(this.conf, dataStream.getExecutionConfig().getParallelism());
            RowType rowType = (RowType)((DataType)this.schema.toSinkRowDataType().notNull()).getLogicalType();
            String writeOperation = this.conf.get(FlinkOptions.OPERATION);
            if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
                return Pipelines.bulkInsert(this.conf, rowType, (DataStream<RowData>)dataStream);
            }
            if (OptionsResolver.isAppendMode(this.conf)) {
                DataStream<Object> pipeline = Pipelines.append(this.conf, rowType, (DataStream<RowData>)dataStream, context.isBounded());
                if (OptionsResolver.needsAsyncClustering(this.conf)) {
                    return Pipelines.cluster(this.conf, rowType, pipeline);
                }
                return Pipelines.dummySink(pipeline);
            }
            DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(this.conf, rowType, (DataStream<RowData>)dataStream, context.isBounded(), this.overwrite);
            DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(this.conf, hoodieRecordDataStream);
            if (OptionsResolver.needsAsyncCompaction(this.conf)) {
                if (context.isBounded()) {
                    this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
                }
                return Pipelines.compact(this.conf, pipeline);
            }
            return Pipelines.clean(this.conf, pipeline);
        };
    }

    @VisibleForTesting
    public Configuration getConf() {
        return this.conf;
    }

    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
        if (this.conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) {
            return ChangelogModes.FULL;
        }
        return ChangelogModes.UPSERT;
    }

    public DynamicTableSink copy() {
        return new HoodieTableSink(this.conf, this.schema, this.overwrite);
    }

    public String asSummaryString() {
        return "HoodieTableSink";
    }

    public void applyStaticPartition(Map<String, String> partitions) {
        if (this.overwrite && partitions.size() > 0) {
            this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE.value());
        }
    }

    public void applyOverwrite(boolean overwrite) {
        this.overwrite = overwrite;
        this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE_TABLE.value());
    }
}

