/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table;

import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.adapter.SupportsRowLevelDeleteAdapter;
import org.apache.hudi.adapter.SupportsRowLevelUpdateAdapter;
import org.apache.hudi.client.model.HoodieFlinkInternalRow;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.util.DataModificationInfos;

public class HoodieTableSink
implements DynamicTableSink,
SupportsPartitioning,
SupportsOverwrite,
SupportsRowLevelDeleteAdapter,
SupportsRowLevelUpdateAdapter {
    private final Configuration conf;
    private final ResolvedSchema schema;
    private boolean overwrite = false;

    public HoodieTableSink(Configuration conf, ResolvedSchema schema) {
        this.conf = conf;
        this.schema = schema;
    }

    public HoodieTableSink(Configuration conf, ResolvedSchema schema, boolean overwrite) {
        this.conf = conf;
        this.schema = schema;
        this.overwrite = overwrite;
    }

    public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
        return dataStream -> {
            long ckpTimeout = dataStream.getExecutionEnvironment().getCheckpointConfig().getCheckpointTimeout();
            this.conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
            OptionsInference.setupSinkTasks(this.conf, dataStream.getExecutionConfig().getParallelism());
            OptionsInference.setupClientId(this.conf);
            RowType rowType = (RowType)((DataType)this.schema.toSinkRowDataType().notNull()).getLogicalType();
            if (OptionsResolver.isBulkInsertOperation(this.conf)) {
                if (!context.isBounded()) {
                    throw new HoodieException("The bulk insert should be run in batch execution mode.");
                }
                return Pipelines.bulkInsert(this.conf, rowType, (DataStream<RowData>)dataStream);
            }
            if (OptionsResolver.isAppendMode(this.conf)) {
                this.conf.set((ConfigOption)FlinkOptions.COMPACTION_SCHEDULE_ENABLED, (Object)false);
                DataStream<Object> pipeline = Pipelines.append(this.conf, rowType, (DataStream<RowData>)dataStream);
                if (OptionsResolver.needsAsyncClustering(this.conf)) {
                    return Pipelines.cluster(this.conf, rowType, pipeline);
                }
                if (OptionsResolver.isLazyFailedWritesCleanPolicy(this.conf)) {
                    return Pipelines.clean(this.conf, pipeline);
                }
                return Pipelines.dummySink(pipeline);
            }
            DataStream<HoodieFlinkInternalRow> hoodieRecordDataStream = Pipelines.bootstrap(this.conf, rowType, (DataStream<RowData>)dataStream, context.isBounded(), this.overwrite);
            DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(this.conf, rowType, hoodieRecordDataStream);
            if (OptionsResolver.needsAsyncCompaction(this.conf)) {
                if (context.isBounded()) {
                    this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
                }
                return Pipelines.compact(this.conf, pipeline);
            }
            return Pipelines.clean(this.conf, pipeline);
        };
    }

    @VisibleForTesting
    public Configuration getConf() {
        return this.conf;
    }

    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
        if (this.conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) {
            return ChangelogModes.FULL;
        }
        return ChangelogModes.UPSERT;
    }

    public DynamicTableSink copy() {
        return new HoodieTableSink(this.conf, this.schema, this.overwrite);
    }

    public String asSummaryString() {
        return "HoodieTableSink";
    }

    public void applyStaticPartition(Map<String, String> partitions) {
        if (this.overwrite && !partitions.isEmpty()) {
            this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE.value());
        }
    }

    public void applyOverwrite(boolean overwrite) {
        this.overwrite = overwrite;
        if (OptionsResolver.overwriteDynamicPartition(this.conf)) {
            this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE.value());
        } else {
            this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE_TABLE.value());
        }
    }

    public SupportsRowLevelDeleteAdapter.RowLevelDeleteInfoAdapter applyRowLevelDelete() {
        this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.DELETE.value());
        return DataModificationInfos.DEFAULT_DELETE_INFO;
    }

    public SupportsRowLevelUpdateAdapter.RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> list) {
        this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.UPSERT.value());
        return DataModificationInfos.DEFAULT_UPDATE_INFO;
    }
}

