/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.engine;

import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.StreamFeatureGroup;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaRecordSerializer {
    private final String topic;
    private final List<String> primaryKeys;
    private final Map<String, byte[]> headerMap = new HashMap<String, byte[]>();

    KafkaRecordSerializer(StreamFeatureGroup streamFeatureGroup) throws FeatureStoreException, IOException {
        this.topic = streamFeatureGroup.getOnlineTopicName();
        this.primaryKeys = streamFeatureGroup.getPrimaryKeys();
        this.headerMap.put("projectId", String.valueOf(streamFeatureGroup.getFeatureStore().getProjectId()).getBytes(StandardCharsets.UTF_8));
        this.headerMap.put("featureGroupId", String.valueOf(streamFeatureGroup.getId()).getBytes(StandardCharsets.UTF_8));
        this.headerMap.put("subjectId", String.valueOf(streamFeatureGroup.getSubject().getId()).getBytes(StandardCharsets.UTF_8));
    }

    public ProducerRecord<byte[], byte[]> serialize(GenericRecord genericRecord) {
        byte[] key = this.serializeKey(genericRecord);
        byte[] value = this.serializeValue(genericRecord);
        ProducerRecord producerRecord = new ProducerRecord(this.topic, null, (Object)key, (Object)value);
        for (Map.Entry<String, byte[]> entry : this.headerMap.entrySet()) {
            producerRecord.headers().add(entry.getKey(), entry.getValue());
        }
        return producerRecord;
    }

    public byte[] serializeKey(GenericRecord genericRecord) {
        ArrayList<String> primaryKeyValues = new ArrayList<String>();
        for (String primaryKey : this.primaryKeys) {
            primaryKeyValues.add(genericRecord.get(primaryKey).toString());
        }
        return String.join((CharSequence)";", primaryKeyValues).getBytes(StandardCharsets.UTF_8);
    }

    public byte[] serializeValue(GenericRecord genericRecord) {
        ReflectDatumWriter datumWriter = new ReflectDatumWriter(genericRecord.getSchema());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        byteArrayOutputStream.reset();
        BinaryEncoder binaryEncoder = new EncoderFactory().binaryEncoder((OutputStream)byteArrayOutputStream, null);
        try {
            datumWriter.write((Object)genericRecord, (Encoder)binaryEncoder);
            binaryEncoder.flush();
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        return byteArrayOutputStream.toByteArray();
    }
}

