/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.processor.maxwell;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.Serializable;
import java.util.Locale;
import java.util.Objects;
import java.util.regex.Pattern;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.DateTimeUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
import org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;

public class MaxwellJsonKafkaSourcePostProcessor
extends JsonKafkaSourcePostProcessor {
    private static final ObjectMapper MAPPER = new ObjectMapper();
    private final Option<String> databaseRegex;
    private final String tableRegex;
    private static final String DATABASE = "database";
    private static final String TABLE = "table";
    private static final String DATA = "data";
    private static final String OPERATION_TYPE = "type";
    private static final String TS = "ts";
    private static final String INSERT = "insert";
    private static final String UPDATE = "update";
    private static final String DELETE = "delete";

    public MaxwellJsonKafkaSourcePostProcessor(TypedProperties props) {
        super(props);
        this.databaseRegex = Option.ofNullable(props.getString(Config.DATABASE_NAME_REGEX_PROP.key(), null));
        this.tableRegex = props.getString(Config.TABLE_NAME_REGEX_PROP.key());
    }

    @Override
    public JavaRDD<String> process(JavaRDD<String> maxwellJsonRecords) {
        return maxwellJsonRecords.map((Function & Serializable)record -> {
            String table;
            JsonNode inputJson = MAPPER.readTree(record);
            String database = inputJson.get(DATABASE).textValue();
            if (this.isTargetTable(database, table = inputJson.get(TABLE).textValue())) {
                ObjectNode result = (ObjectNode)inputJson.get(DATA);
                String type = inputJson.get(OPERATION_TYPE).textValue();
                if (INSERT.equals(type) || UPDATE.equals(type)) {
                    result.put("_hoodie_is_deleted", false);
                    return result.toString();
                }
                if (DELETE.equals(type)) {
                    return this.processDelete(inputJson, result);
                }
                return null;
            }
            return null;
        }).filter(Objects::nonNull);
    }

    private String processDelete(JsonNode inputJson, ObjectNode result) {
        result.put("_hoodie_is_deleted", true);
        PreCombineFieldType preCombineFieldType = PreCombineFieldType.valueOf(this.props.getString(Config.PRECOMBINE_FIELD_TYPE_PROP.key(), Config.PRECOMBINE_FIELD_TYPE_PROP.defaultValue()).toUpperCase(Locale.ROOT));
        if (!preCombineFieldType.equals((Object)PreCombineFieldType.NON_TIMESTAMP)) {
            String preCombineField = this.props.getString(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), HoodieWriteConfig.PRECOMBINE_FIELD_NAME.defaultValue());
            long ts = inputJson.get(TS).longValue();
            if (preCombineFieldType.equals((Object)PreCombineFieldType.DATE_STRING)) {
                String timeFormat = this.props.getString(Config.PRECOMBINE_FIELD_FORMAT_PROP.key(), Config.PRECOMBINE_FIELD_FORMAT_PROP.defaultValue());
                result.put(preCombineField, DateTimeUtils.formatUnixTimestamp(ts, timeFormat));
            } else if (preCombineFieldType.equals((Object)PreCombineFieldType.EPOCHMILLISECONDS)) {
                result.put(preCombineField, ts * 1000L);
            } else if (preCombineFieldType.equals((Object)PreCombineFieldType.UNIX_TIMESTAMP)) {
                result.put(preCombineField, ts);
            } else {
                throw new HoodieSourcePostProcessException("Unsupported preCombine time format " + (Object)((Object)preCombineFieldType));
            }
        }
        return result.toString();
    }

    private boolean isTargetTable(String database, String table) {
        if (!this.databaseRegex.isPresent()) {
            return Pattern.matches(this.tableRegex, table);
        }
        return Pattern.matches(this.databaseRegex.get(), database) && Pattern.matches(this.tableRegex, table);
    }

    public static class Config {
        public static final ConfigProperty<String> DATABASE_NAME_REGEX_PROP = ConfigProperty.key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.database.regex").noDefaultValue().withDocumentation("Database name regex.");
        public static final ConfigProperty<String> TABLE_NAME_REGEX_PROP = ConfigProperty.key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.table.regex").noDefaultValue().withDocumentation("Table name regex.");
        public static final ConfigProperty<String> PRECOMBINE_FIELD_TYPE_PROP = ConfigProperty.key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.precombine.field.type").defaultValue(PreCombineFieldType.DATE_STRING.toString()).withDocumentation("Data type of the preCombine field. could be NON_TIMESTAMP, DATE_STRING,UNIX_TIMESTAMP or EPOCHMILLISECONDS. DATE_STRING by default ");
        public static final ConfigProperty<String> PRECOMBINE_FIELD_FORMAT_PROP = ConfigProperty.key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.precombine.field.format").defaultValue("yyyy-MM-dd HH:mm:ss").withDocumentation("When the preCombine filed is in DATE_STRING format, use should tell hoodiewhat format it is. 'yyyy-MM-dd HH:mm:ss' by default");
    }
}

