/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.beam.engine;

import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.beam.StreamFeatureGroup;
import com.logicalclocks.hsfs.beam.engine.BeamKafkaProducer;
import com.logicalclocks.hsfs.beam.engine.GenericAvroSerializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.Row;
import org.apache.kafka.common.serialization.StringSerializer;

public class BeamProducer
extends PTransform<PCollection<Row>, PDone> {
    private String topic;
    private Map<String, String> properties;
    private transient Schema schema;
    private transient Schema encodedSchema;
    private Map<String, Schema> deserializedComplexFeatureSchemas;
    private List<String> primaryKeys;
    private final Map<String, byte[]> headerMap = new HashMap<String, byte[]>();

    public BeamProducer(String topic, Map<String, String> properties, Schema schema, Schema encodedSchema, Map<String, Schema> deserializedComplexFeatureSchemas, List<String> primaryKeys, StreamFeatureGroup streamFeatureGroup) throws FeatureStoreException, IOException {
        this.schema = schema;
        this.encodedSchema = encodedSchema;
        this.topic = topic;
        this.properties = properties;
        this.deserializedComplexFeatureSchemas = deserializedComplexFeatureSchemas;
        this.primaryKeys = primaryKeys;
        this.headerMap.put("projectId", String.valueOf(streamFeatureGroup.getFeatureStore().getProjectId()).getBytes(StandardCharsets.UTF_8));
        this.headerMap.put("featureGroupId", String.valueOf(streamFeatureGroup.getId()).getBytes(StandardCharsets.UTF_8));
        this.headerMap.put("subjectId", String.valueOf(streamFeatureGroup.getSubject().getId()).getBytes(StandardCharsets.UTF_8));
    }

    public PDone expand(PCollection<Row> input) {
        PCollection featureGroupAvroRecord = ((PCollection)input.apply("Convert to avro generic record", (PTransform)ParDo.of((DoFn)new DoFn<Row, GenericRecord>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                GenericRecord genericRecord = AvroUtils.toGenericRecord((Row)((Row)c.element()), (Schema)BeamProducer.this.schema);
                c.output((Object)genericRecord);
            }
        }))).setCoder((Coder)AvroCoder.of(GenericRecord.class, (Schema)this.schema));
        if (!this.deserializedComplexFeatureSchemas.keySet().isEmpty()) {
            featureGroupAvroRecord = (PCollection)featureGroupAvroRecord.apply("Serialize complex features", (PTransform)ParDo.of((DoFn)new DoFn<GenericRecord, GenericRecord>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c) throws IOException {
                    GenericData.Record encodedRecord = new GenericData.Record(BeamProducer.this.encodedSchema);
                    for (Schema.Field field : ((GenericRecord)c.element()).getSchema().getFields()) {
                        if (!BeamProducer.this.deserializedComplexFeatureSchemas.containsKey(field.name())) continue;
                        GenericDatumWriter complexFeatureDatumWriter = new GenericDatumWriter((Schema)BeamProducer.this.deserializedComplexFeatureSchemas.get(field.name()));
                        ByteArrayOutputStream complexFeatureByteArrayOutputStream = new ByteArrayOutputStream();
                        complexFeatureByteArrayOutputStream.reset();
                        BinaryEncoder complexFeatureBinaryEncoder = new EncoderFactory().binaryEncoder((OutputStream)complexFeatureByteArrayOutputStream, null);
                        complexFeatureDatumWriter.write((Object)field.name(), (Encoder)complexFeatureBinaryEncoder);
                        complexFeatureBinaryEncoder.flush();
                        encodedRecord.put(field.name(), (Object)ByteBuffer.wrap(complexFeatureByteArrayOutputStream.toByteArray()));
                    }
                    c.output((Object)encodedRecord);
                }
            }));
        }
        return (PDone)((PCollection)featureGroupAvroRecord.apply("Convert To KV of primaryKey:GenericRecord", (PTransform)ParDo.of((DoFn)new DoFn<GenericRecord, KV<String, GenericRecord>>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                ArrayList<String> primaryKeyValues = new ArrayList<String>();
                for (String primaryKey : BeamProducer.this.primaryKeys) {
                    primaryKeyValues.add(((GenericRecord)c.element()).get(primaryKey).toString());
                }
                c.output((Object)KV.of((Object)String.join((CharSequence)";", primaryKeyValues), (Object)c.element()));
            }
        }))).apply("Sync to online feature group kafka topic", (PTransform)KafkaIO.write().withBootstrapServers(this.properties.get("bootstrap.servers").toString()).withTopic(this.topic).withKeySerializer(StringSerializer.class).withValueSerializer(GenericAvroSerializer.class).withInputTimestamp().withProducerFactoryFn((SerializableFunction & Serializable)props -> {
            try {
                Path keyStorePath = Paths.get(this.properties.get("ssl.keystore.location"), new String[0]);
                InputStream keyStoreStream = Objects.requireNonNull(BeamProducer.class.getClassLoader().getResourceAsStream(keyStorePath.getFileName().toString()));
                if (!Files.exists(keyStorePath, new LinkOption[0])) {
                    Files.copy(keyStoreStream, keyStorePath, StandardCopyOption.REPLACE_EXISTING);
                }
                Path trustStorePath = Paths.get(this.properties.get("ssl.truststore.location"), new String[0]);
                InputStream trustStoreStream = Objects.requireNonNull(BeamProducer.class.getClassLoader().getResourceAsStream(trustStorePath.getFileName().toString()));
                if (!Files.exists(trustStorePath, new LinkOption[0])) {
                    Files.copy(trustStoreStream, trustStorePath, StandardCopyOption.REPLACE_EXISTING);
                }
            }
            catch (IOException e) {
                e.printStackTrace();
            }
            props.putAll(this.properties);
            BeamKafkaProducer producer = new BeamKafkaProducer((Map)props);
            producer.setHeaderMap(this.headerMap);
            return producer;
        }));
    }
}

