/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.com.beust.jcommander.IStringConverter;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.com.beust.jcommander.ParameterException;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.operator.InstantGenerateOperator;
import org.apache.hudi.operator.KeyedWriteProcessFunction;
import org.apache.hudi.operator.KeyedWriteProcessOperator;
import org.apache.hudi.sink.CommitSink;
import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction;
import org.apache.hudi.util.StreamerUtil;

public class HoodieFlinkStreamer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Config cfg = new Config();
        JCommander cmd = new JCommander((Object)cfg, null, args);
        if (cfg.help.booleanValue() || args.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        env.enableCheckpointing(cfg.checkpointInterval.longValue());
        env.getConfig().setGlobalJobParameters((ExecutionConfig.GlobalJobParameters)cfg);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.disableOperatorChaining();
        if (cfg.flinkCheckPointPath != null) {
            env.setStateBackend((AbstractStateBackend)new FsStateBackend(cfg.flinkCheckPointPath));
        }
        TypedProperties props = StreamerUtil.getProps(cfg);
        props.put("bootstrap.servers", cfg.kafkaBootstrapServers);
        props.put("group.id", cfg.kafkaGroupId);
        props.put("hoodie.datasource.write.payload.class", cfg.payloadClassName);
        props.put("hoodie.datasource.write.precombine.field", cfg.sourceOrderingField);
        SingleOutputStreamOperator inputRecords = env.addSource(new FlinkKafkaConsumer(cfg.kafkaTopic, new SimpleStringSchema(), (Properties)props)).filter(Objects::nonNull).map((MapFunction)new JsonStringToHoodieRecordMapFunction(props)).name("kafka_to_hudi_record").uid("kafka_to_hudi_record_uid");
        inputRecords.transform("InstantGenerateOperator", TypeInformation.of(HoodieRecord.class), (OneInputStreamOperator)new InstantGenerateOperator()).name("instant_generator").uid("instant_generator_id").setParallelism(1).keyBy(HoodieRecord::getPartitionPath).transform("WriteProcessOperator", TypeInformation.of((TypeHint)new TypeHint<Tuple3<String, List<WriteStatus>, Integer>>(){}), (OneInputStreamOperator)new KeyedWriteProcessOperator(new KeyedWriteProcessFunction())).name("write_process").uid("write_process_uid").setParallelism(env.getParallelism()).addSink((SinkFunction)new CommitSink()).name("commit_sink").uid("commit_sink_uid").setParallelism(1);
        env.execute(cfg.targetTableName);
    }

    private static class OperationConverter
    implements IStringConverter<WriteOperationType> {
        private OperationConverter() {
        }

        @Override
        public WriteOperationType convert(String value) throws ParameterException {
            return WriteOperationType.valueOf(value);
        }
    }

    public static class Config
    extends Configuration {
        @Parameter(names={"--kafka-topic"}, description="kafka topic", required=true)
        public String kafkaTopic;
        @Parameter(names={"--kafka-group-id"}, description="kafka consumer group id", required=true)
        public String kafkaGroupId;
        @Parameter(names={"--kafka-bootstrap-servers"}, description="kafka bootstrap.servers", required=true)
        public String kafkaBootstrapServers;
        @Parameter(names={"--flink-checkpoint-path"}, description="flink checkpoint path")
        public String flinkCheckPointPath;
        @Parameter(names={"--flink-block-retry-times"}, description="Times to retry when latest instant has not completed")
        public String blockRetryTime = "10";
        @Parameter(names={"--flink-block-retry-interval"}, description="Seconds between two tries when latest instant has not completed")
        public String blockRetryInterval = "1";
        @Parameter(names={"--target-base-path"}, description="base path for the target hoodie table. (Will be created if did not exist first time around. If exists, expected to be a hoodie table)", required=true)
        public String targetBasePath;
        @Parameter(names={"--target-table"}, description="name of the target table in Hive", required=true)
        public String targetTableName;
        @Parameter(names={"--table-type"}, description="Type of table. COPY_ON_WRITE (or) MERGE_ON_READ", required=true)
        public String tableType;
        @Parameter(names={"--props"}, description="path to properties file on localfs or dfs, with configurations for hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, referto individual classes, for supported properties.")
        public String propsFilePath = "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
        @Parameter(names={"--hoodie-conf"}, description="Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter.")
        public List<String> configs = new ArrayList<String>();
        @Parameter(names={"--source-ordering-field"}, description="Field within source record to decide how to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record")
        public String sourceOrderingField = "ts";
        @Parameter(names={"--payload-class"}, description="subclass of HoodieRecordPayload, that works off a GenericRecord. Implement your own, if you want to do something other than overwriting existing value")
        public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
        @Parameter(names={"--op"}, description="Takes one of these values : UPSERT (default), INSERT (use when input is purely new data/inserts to gain speed)", converter=OperationConverter.class)
        public WriteOperationType operation = WriteOperationType.UPSERT;
        @Parameter(names={"--filter-dupes"}, description="Should duplicate records from source be dropped/filtered out before insert/bulk-insert")
        public Boolean filterDupes = false;
        @Parameter(names={"--commit-on-errors"}, description="Commit even when some records failed to be written")
        public Boolean commitOnErrors = false;
        @Parameter(names={"--checkpoint-interval"}, description="Flink checkpoint interval.")
        public Long checkpointInterval = 5000L;
        @Parameter(names={"--help", "-h"}, help=true)
        public Boolean help = false;
    }
}

