/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.engine;

import com.google.common.base.Strings;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.StreamFeatureGroup;
import com.logicalclocks.hsfs.engine.EngineBase;
import com.logicalclocks.hsfs.engine.FeatureGroupUtils;
import com.logicalclocks.hsfs.engine.KafkaRecordSerializer;
import com.logicalclocks.hsfs.metadata.DatasetApi;
import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.SchemaValidationException;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class Engine<T>
extends EngineBase {
    private static Engine INSTANCE = null;
    private FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils();

    public static synchronized Engine getInstance() throws FeatureStoreException {
        if (INSTANCE == null) {
            INSTANCE = new Engine();
        }
        return INSTANCE;
    }

    private Engine() throws FeatureStoreException {
    }

    public List<T> writeStream(StreamFeatureGroup streamFeatureGroup, List<T> featureData, Map<String, String> writeOptions) throws FeatureStoreException, IOException, SchemaValidationException, NoSuchFieldException, IllegalAccessException {
        HashMap<String, Schema> complexFeatureSchemas = new HashMap<String, Schema>();
        for (String featureName : streamFeatureGroup.getComplexFeatures()) {
            complexFeatureSchemas.put(featureName.toString(), new Schema.Parser().parse(streamFeatureGroup.getFeatureAvroSchema(featureName.toString())));
        }
        Schema featureGroupSchema = new Schema.Parser().parse(streamFeatureGroup.getAvroSchema());
        Schema encodedFeatureGroupSchema = new Schema.Parser().parse(streamFeatureGroup.getEncodedAvroSchema());
        Properties kafkaProps = new Properties();
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        kafkaProps.putAll(this.getKafkaConfig(streamFeatureGroup, writeOptions));
        KafkaRecordSerializer kafkaRecordSerializer = new KafkaRecordSerializer(streamFeatureGroup, Long.valueOf(featureData.size()));
        try (KafkaProducer producer = new KafkaProducer(kafkaProps);){
            for (T input : featureData) {
                GenericRecord genericRecord = this.convertPojoToGenericRecord(input, featureGroupSchema, encodedFeatureGroupSchema, complexFeatureSchemas);
                ProducerRecord<byte[], byte[]> record = kafkaRecordSerializer.serialize(genericRecord);
                producer.send(record);
            }
            producer.flush();
        }
        return featureData;
    }

    private GenericRecord convertPojoToGenericRecord(Object input, Schema featureGroupSchema, Schema encodedFeatureGroupSchema, Map<String, Schema> complexFeatureSchemas) throws NoSuchFieldException, IllegalAccessException, FeatureStoreException, IOException {
        GenericRecord plainRecord = input instanceof GenericRecord ? (GenericRecord)input : this.convertPojoToGenericRecord(input, featureGroupSchema);
        GenericData.Record encodedRecord = new GenericData.Record(encodedFeatureGroupSchema);
        for (Schema.Field field : encodedFeatureGroupSchema.getFields()) {
            if (complexFeatureSchemas.containsKey(field.name())) {
                Schema complexFieldSchema = complexFeatureSchemas.get(field.name());
                GenericDatumWriter complexFeatureDatumWriter = new GenericDatumWriter(complexFieldSchema);
                ByteArrayOutputStream complexFeatureByteArrayOutputStream = new ByteArrayOutputStream();
                Throwable throwable = null;
                try {
                    BinaryEncoder complexFeatureBinaryEncoder = new EncoderFactory().binaryEncoder((OutputStream)complexFeatureByteArrayOutputStream, null);
                    complexFeatureDatumWriter.write(plainRecord.get(field.name()), (Encoder)complexFeatureBinaryEncoder);
                    complexFeatureBinaryEncoder.flush();
                    encodedRecord.put(field.name(), (Object)ByteBuffer.wrap(complexFeatureByteArrayOutputStream.toByteArray()));
                    continue;
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (complexFeatureByteArrayOutputStream == null) continue;
                    if (throwable != null) {
                        try {
                            complexFeatureByteArrayOutputStream.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    complexFeatureByteArrayOutputStream.close();
                    continue;
                }
            }
            encodedRecord.put(field.name(), plainRecord.get(field.name()));
        }
        return encodedRecord;
    }

    private GenericRecord convertPojoToGenericRecord(Object input, Schema featureGroupSchema) throws NoSuchFieldException, IllegalAccessException, FeatureStoreException {
        GenericData.Record record = new GenericData.Record(featureGroupSchema);
        for (Schema.Field schemaField : featureGroupSchema.getFields()) {
            Field pojoField = input.getClass().getDeclaredField(schemaField.name());
            pojoField.setAccessible(true);
            Object pojoValue = pojoField.get(input);
            record.put(schemaField.name(), this.convertValue(pojoValue, schemaField.schema()));
        }
        return record;
    }

    private Object convertValue(Object value, Schema schema) throws NoSuchFieldException, IllegalAccessException, FeatureStoreException {
        if (value == null) {
            return null;
        }
        switch (schema.getType()) {
            case RECORD: {
                return this.convertPojoToGenericRecord(value, schema);
            }
            case ARRAY: {
                Schema elementType = schema.getElementType();
                if (value instanceof Collection) {
                    Collection collection = (Collection)value;
                    ArrayList<Object> avroList = new ArrayList<Object>();
                    for (Object item : collection) {
                        avroList.add(this.convertValue(item, elementType));
                    }
                    return avroList;
                }
                if (value.getClass().isArray()) {
                    ArrayList<Object> avroList = new ArrayList<Object>();
                    for (Object item : (Object[])value) {
                        avroList.add(this.convertValue(item, elementType));
                    }
                    return avroList;
                }
                throw new FeatureStoreException("Unsupported array type: " + value.getClass());
            }
            case UNION: {
                for (Schema subSchema : schema.getTypes()) {
                    if (subSchema.getType() == Schema.Type.NULL) continue;
                    try {
                        return this.convertValue(value, subSchema);
                    }
                    catch (Exception exception) {
                    }
                }
                throw new FeatureStoreException("Cannot match union type for value: " + value.getClass());
            }
            case ENUM: {
                return new GenericData.EnumSymbol(schema, value.toString());
            }
            case STRING: {
                return value.toString();
            }
            case INT: 
            case LONG: 
            case FLOAT: 
            case DOUBLE: 
            case BOOLEAN: {
                return value;
            }
            case MAP: {
                if (value instanceof Map) {
                    HashMap<String, Object> avroMap = new HashMap<String, Object>();
                    for (Map.Entry entry : ((Map)value).entrySet()) {
                        if (!(entry.getKey() instanceof String)) {
                            throw new FeatureStoreException("Avro only supports string keys in maps.");
                        }
                        avroMap.put(entry.getKey().toString(), this.convertValue(entry.getValue(), schema.getValueType()));
                    }
                    return avroMap;
                }
                throw new FeatureStoreException("Unsupported map type: " + value.getClass());
            }
        }
        throw new FeatureStoreException("Unsupported Avro type: " + schema.getType());
    }

    @Override
    public String addFile(String filePath) throws IOException, FeatureStoreException {
        if (Strings.isNullOrEmpty((String)filePath)) {
            return filePath;
        }
        String targetPath = System.getProperty("java.io.tmpdir") + filePath.substring(filePath.lastIndexOf("/"));
        try (FileOutputStream outputStream = new FileOutputStream(targetPath);){
            outputStream.write(DatasetApi.readContent(filePath, this.featureGroupUtils.getDatasetType(filePath)));
        }
        return targetPath;
    }

    @Override
    public Map<String, String> getKafkaConfig(FeatureGroupBase featureGroup, Map<String, String> writeOptions) throws FeatureStoreException, IOException {
        boolean external = !System.getProperties().containsKey("hopsworks.restendpoint") && (writeOptions == null || !Boolean.parseBoolean(writeOptions.getOrDefault("internal_kafka", "false")));
        StorageConnector.KafkaConnector storageConnector = this.storageConnectorApi.getKafkaStorageConnector(featureGroup.getFeatureStore(), external);
        storageConnector.setSslTruststoreLocation(this.addFile(storageConnector.getSslTruststoreLocation()));
        storageConnector.setSslKeystoreLocation(this.addFile(storageConnector.getSslKeystoreLocation()));
        Map<String, String> config = storageConnector.kafkaOptions();
        if (writeOptions != null) {
            config.putAll(writeOptions);
        }
        config.put("enable.idempotence", "false");
        return config;
    }
}

