package com.logicalclocks.hsfs.beam.engine;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.SerializedLambda;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.Row;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;

/* loaded from: input_file:com/logicalclocks/hsfs/beam/engine/BeamProducer.class */
public class BeamProducer extends PTransform<PCollection<Row>, PDone> {
    private String topic;
    private Map<String, String> properties;
    private transient Schema schema;
    private transient Schema encodedSchema;
    private Map<String, Schema> deserializedComplexFeatureSchemas;
    private List<String> primaryKeys;

    public BeamProducer(String str, Map<String, String> map, Schema schema, Schema schema2, Map<String, Schema> map2, List<String> list) {
        this.schema = schema;
        this.encodedSchema = schema2;
        this.topic = str;
        this.properties = map;
        this.deserializedComplexFeatureSchemas = map2;
        this.primaryKeys = list;
    }

    @Override // org.apache.beam.sdk.transforms.PTransform
    public PDone expand(PCollection<Row> pCollection) {
        PCollection coder = ((PCollection) pCollection.apply("Convert to avro generic record", ParDo.of(new DoFn<Row, GenericRecord>() { // from class: com.logicalclocks.hsfs.beam.engine.BeamProducer.1
            @DoFn.ProcessElement
            public void processElement(DoFn<Row, GenericRecord>.ProcessContext processContext) {
                processContext.output(AvroUtils.toGenericRecord(processContext.element(), BeamProducer.this.schema));
            }
        }))).setCoder(AvroCoder.of(GenericRecord.class, this.schema));
        if (!this.deserializedComplexFeatureSchemas.keySet().isEmpty()) {
            coder = (PCollection) coder.apply("Serialize complex features", ParDo.of(new DoFn<GenericRecord, GenericRecord>() { // from class: com.logicalclocks.hsfs.beam.engine.BeamProducer.2
                @DoFn.ProcessElement
                public void processElement(DoFn<GenericRecord, GenericRecord>.ProcessContext processContext) throws IOException {
                    GenericData.Record record = new GenericData.Record(BeamProducer.this.encodedSchema);
                    for (Schema.Field field : processContext.element().getSchema().getFields()) {
                        if (BeamProducer.this.deserializedComplexFeatureSchemas.containsKey(field.name())) {
                            GenericDatumWriter genericDatumWriter = new GenericDatumWriter((Schema) BeamProducer.this.deserializedComplexFeatureSchemas.get(field.name()));
                            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                            byteArrayOutputStream.reset();
                            BinaryEncoder binaryEncoder = new EncoderFactory().binaryEncoder(byteArrayOutputStream, null);
                            genericDatumWriter.write(field.name(), binaryEncoder);
                            binaryEncoder.flush();
                            record.put(field.name(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
                        }
                    }
                    processContext.output(record);
                }
            }));
        }
        return (PDone) ((PCollection) coder.apply("Convert To KV of primaryKey:GenericRecord", ParDo.of(new DoFn<GenericRecord, KV<String, GenericRecord>>() { // from class: com.logicalclocks.hsfs.beam.engine.BeamProducer.3
            @DoFn.ProcessElement
            public void processElement(DoFn<GenericRecord, KV<String, GenericRecord>>.ProcessContext processContext) {
                ArrayList arrayList = new ArrayList();
                Iterator it = BeamProducer.this.primaryKeys.iterator();
                while (it.hasNext()) {
                    arrayList.add(processContext.element().get((String) it.next()).toString());
                }
                processContext.output(KV.of(String.join(";", arrayList), processContext.element()));
            }
        }))).apply("Sync to online feature group kafka topic", KafkaIO.write().withBootstrapServers(this.properties.get("bootstrap.servers").toString()).withTopic(this.topic).withKeySerializer(StringSerializer.class).withValueSerializer(GenericAvroSerializer.class).withInputTimestamp().withProducerFactoryFn(map -> {
            try {
                Path path = Paths.get(this.properties.get("ssl.keystore.location"), new String[0]);
                InputStream inputStream = (InputStream) Objects.requireNonNull(BeamProducer.class.getClassLoader().getResourceAsStream(path.getFileName().toString()));
                if (!Files.exists(path, new LinkOption[0])) {
                    Files.copy(inputStream, path, StandardCopyOption.REPLACE_EXISTING);
                }
                Path path2 = Paths.get(this.properties.get("ssl.truststore.location"), new String[0]);
                InputStream inputStream2 = (InputStream) Objects.requireNonNull(BeamProducer.class.getClassLoader().getResourceAsStream(path2.getFileName().toString()));
                if (!Files.exists(path2, new LinkOption[0])) {
                    Files.copy(inputStream2, path2, StandardCopyOption.REPLACE_EXISTING);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            map.putAll(this.properties);
            return new KafkaProducer((Map<String, Object>) map);
        }));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 241518573:
                if (implMethodName.equals("lambda$expand$bbc94b5d$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/logicalclocks/hsfs/beam/engine/BeamProducer") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;)Lorg/apache/kafka/clients/producer/Producer;")) {
                    BeamProducer beamProducer = (BeamProducer) serializedLambda.getCapturedArg(0);
                    return map -> {
                        try {
                            Path path = Paths.get(this.properties.get("ssl.keystore.location"), new String[0]);
                            InputStream inputStream = (InputStream) Objects.requireNonNull(BeamProducer.class.getClassLoader().getResourceAsStream(path.getFileName().toString()));
                            if (!Files.exists(path, new LinkOption[0])) {
                                Files.copy(inputStream, path, StandardCopyOption.REPLACE_EXISTING);
                            }
                            Path path2 = Paths.get(this.properties.get("ssl.truststore.location"), new String[0]);
                            InputStream inputStream2 = (InputStream) Objects.requireNonNull(BeamProducer.class.getClassLoader().getResourceAsStream(path2.getFileName().toString()));
                            if (!Files.exists(path2, new LinkOption[0])) {
                                Files.copy(inputStream2, path2, StandardCopyOption.REPLACE_EXISTING);
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        map.putAll(this.properties);
                        return new KafkaProducer((Map<String, Object>) map);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
