package com.logicalclocks.hsfs.engine;

import com.google.common.base.Strings;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.StreamFeatureGroup;
import com.logicalclocks.hsfs.metadata.DatasetApi;
import com.logicalclocks.hsfs.metadata.HopsworksInternalClient;
import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.SchemaValidationException;
import org.apache.avro.SchemaValidatorBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.util.AntPathMatcher;

/* loaded from: input_file:com/logicalclocks/hsfs/engine/Engine.class */
public class Engine<T> extends EngineBase {
    private static Engine INSTANCE = null;
    private FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils();

    public static synchronized Engine getInstance() throws FeatureStoreException {
        if (INSTANCE == null) {
            INSTANCE = new Engine();
        }
        return INSTANCE;
    }

    private Engine() throws FeatureStoreException {
    }

    public List<T> writeStream(StreamFeatureGroup streamFeatureGroup, List<T> list, Map<String, String> map) throws FeatureStoreException, IOException, SchemaValidationException, NoSuchFieldException, IllegalAccessException {
        HashMap hashMap = new HashMap();
        for (String str : streamFeatureGroup.getComplexFeatures()) {
            hashMap.put(str.toString(), new Schema.Parser().parse(streamFeatureGroup.getFeatureAvroSchema(str.toString())));
        }
        Schema parse = new Schema.Parser().parse(streamFeatureGroup.getEncodedAvroSchema());
        Properties properties = new Properties();
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.putAll(getKafkaConfig(streamFeatureGroup, map));
        KafkaRecordSerializer kafkaRecordSerializer = new KafkaRecordSerializer(streamFeatureGroup);
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Throwable th = null;
        try {
            try {
                for (T t : list) {
                    validatePojoAgainstSchema(t, new Schema.Parser().parse(streamFeatureGroup.getAvroSchema()));
                    kafkaProducer.send(kafkaRecordSerializer.serialize(pojoToAvroRecord(t, parse, hashMap)));
                }
                kafkaProducer.flush();
                if (kafkaProducer != null) {
                    if (0 != 0) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                return list;
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th3;
        }
    }

    public GenericRecord pojoToAvroRecord(Object obj, Schema schema, Map<String, Schema> map) throws NoSuchFieldException, IOException, IllegalAccessException {
        GenericData.Record record = new GenericData.Record(schema);
        List<Field> list = (List) Arrays.stream(obj.getClass().getDeclaredFields()).filter(field -> {
            return field.getName().equals("SCHEMA$");
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            for (Field field2 : list) {
                populateAvroRecord(record, field2.getName(), field2.get(obj), map);
            }
        } else {
            Field declaredField = obj.getClass().getDeclaredField("SCHEMA$");
            declaredField.setAccessible(true);
            Iterator<Schema.Field> it = ((Schema) declaredField.get(null)).getFields().iterator();
            while (it.hasNext()) {
                String name = it.next().name();
                Field declaredField2 = obj.getClass().getDeclaredField(name);
                declaredField2.setAccessible(true);
                populateAvroRecord(record, name, declaredField2.get(obj), map);
            }
        }
        return record;
    }

    private void populateAvroRecord(GenericRecord genericRecord, String str, Object obj, Map<String, Schema> map) throws IOException {
        if (!map.containsKey(str)) {
            genericRecord.put(str, obj);
            return;
        }
        GenericDatumWriter genericDatumWriter = new GenericDatumWriter(map.get(str));
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        byteArrayOutputStream.reset();
        BinaryEncoder binaryEncoder = new EncoderFactory().binaryEncoder(byteArrayOutputStream, null);
        genericDatumWriter.write(obj, binaryEncoder);
        binaryEncoder.flush();
        genericRecord.put(str, ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
        byteArrayOutputStream.flush();
        byteArrayOutputStream.close();
    }

    private void validatePojoAgainstSchema(Object obj, Schema schema) throws SchemaValidationException {
        new SchemaValidatorBuilder().canReadStrategy().validateAll().validate(schema, Collections.singletonList(ReflectData.get().getSchema(obj.getClass())));
    }

    @Override // com.logicalclocks.hsfs.engine.EngineBase
    public String addFile(String str) throws IOException, FeatureStoreException {
        if (Strings.isNullOrEmpty(str)) {
            return str;
        }
        String str2 = System.getProperty("java.io.tmpdir") + str.substring(str.lastIndexOf(AntPathMatcher.DEFAULT_PATH_SEPARATOR));
        FileOutputStream fileOutputStream = new FileOutputStream(str2);
        Throwable th = null;
        try {
            fileOutputStream.write(DatasetApi.readContent(str, this.featureGroupUtils.getDatasetType(str)));
            if (fileOutputStream != null) {
                if (0 != 0) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    fileOutputStream.close();
                }
            }
            return str2;
        } catch (Throwable th3) {
            if (fileOutputStream != null) {
                if (0 != 0) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileOutputStream.close();
                }
            }
            throw th3;
        }
    }

    @Override // com.logicalclocks.hsfs.engine.EngineBase
    public Map<String, String> getKafkaConfig(FeatureGroupBase featureGroupBase, Map<String, String> map) throws FeatureStoreException, IOException {
        StorageConnector.KafkaConnector kafkaStorageConnector = this.storageConnectorApi.getKafkaStorageConnector(featureGroupBase.getFeatureStore(), !System.getProperties().containsKey(HopsworksInternalClient.REST_ENDPOINT_SYS) && (map == null || !Boolean.parseBoolean(map.getOrDefault("internal_kafka", "false"))));
        kafkaStorageConnector.setSslTruststoreLocation(addFile(kafkaStorageConnector.getSslTruststoreLocation()));
        kafkaStorageConnector.setSslKeystoreLocation(addFile(kafkaStorageConnector.getSslKeystoreLocation()));
        Map<String, String> kafkaOptions = kafkaStorageConnector.kafkaOptions();
        if (map != null) {
            kafkaOptions.putAll(map);
        }
        kafkaOptions.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
        return kafkaOptions;
    }
}
