package org.apache.hudi.streamer;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;

/* loaded from: input_file:org/apache/hudi/streamer/HoodieFlinkStreamer.class */
public class HoodieFlinkStreamer {
    public static void main(String[] strArr) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkStreamerConfig flinkStreamerConfig = new FlinkStreamerConfig();
        JCommander jCommander = new JCommander(flinkStreamerConfig, null, strArr);
        if (flinkStreamerConfig.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        executionEnvironment.enableCheckpointing(flinkStreamerConfig.checkpointInterval.longValue());
        executionEnvironment.getConfig().setGlobalJobParameters(flinkStreamerConfig);
        executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        if (flinkStreamerConfig.flinkCheckPointPath != null) {
            executionEnvironment.setStateBackend(new FsStateBackend(flinkStreamerConfig.flinkCheckPointPath));
        }
        TypedProperties globalProps = DFSPropertiesConfiguration.getGlobalProps();
        globalProps.putAll(StreamerUtil.appendKafkaProps(flinkStreamerConfig));
        RowType logicalType = AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(flinkStreamerConfig)).getLogicalType();
        Configuration flinkConfig = FlinkStreamerConfig.toFlinkConfig(flinkStreamerConfig);
        long checkpointTimeout = executionEnvironment.getCheckpointConfig().getCheckpointTimeout();
        int parallelism = executionEnvironment.getParallelism();
        flinkConfig.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, checkpointTimeout);
        DataStream<RowData> uid = executionEnvironment.addSource(new FlinkKafkaConsumer(flinkStreamerConfig.kafkaTopic, new JsonRowDataDeserializationSchema(logicalType, InternalTypeInfo.of(logicalType), false, true, TimestampFormat.ISO_8601), globalProps)).name("kafka_source").uid("uid_kafka_source");
        if (flinkStreamerConfig.transformerClassNames != null && !flinkStreamerConfig.transformerClassNames.isEmpty()) {
            Option<Transformer> createTransformer = StreamerUtil.createTransformer(flinkStreamerConfig.transformerClassNames);
            if (createTransformer.isPresent()) {
                uid = createTransformer.get().apply(uid);
            }
        }
        DataStream<Object> hoodieStreamWrite = Pipelines.hoodieStreamWrite(flinkConfig, parallelism, Pipelines.bootstrap(flinkConfig, logicalType, parallelism, uid));
        if (StreamerUtil.needsAsyncCompaction(flinkConfig)) {
            Pipelines.compact(flinkConfig, hoodieStreamWrite);
        } else {
            Pipelines.clean(flinkConfig, hoodieStreamWrite);
        }
        executionEnvironment.execute(flinkStreamerConfig.targetTableName);
    }
}
