/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.flink.engine;

import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.engine.FeatureGroupUtils;
import com.logicalclocks.hsfs.flink.StreamFeatureGroup;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaRecordSerializer
implements KafkaRecordSerializationSchema<GenericRecord> {
    private final String topic;
    private final List<String> primaryKeys;
    final Map<String, byte[]> headerMap;

    public KafkaRecordSerializer(StreamFeatureGroup streamFeatureGroup) throws FeatureStoreException, IOException {
        this.topic = streamFeatureGroup.getOnlineTopicName();
        this.primaryKeys = streamFeatureGroup.getPrimaryKeys();
        this.headerMap = FeatureGroupUtils.getHeaders((FeatureGroupBase)streamFeatureGroup, null);
    }

    public void open(SerializationSchema.InitializationContext context, KafkaRecordSerializationSchema.KafkaSinkContext sinkContext) {
    }

    public ProducerRecord<byte[], byte[]> serialize(GenericRecord genericRecord, KafkaRecordSerializationSchema.KafkaSinkContext context, Long timestamp) {
        byte[] key = this.serializeKey(genericRecord);
        byte[] value = this.serializeValue(genericRecord);
        ProducerRecord producerRecord = new ProducerRecord(this.topic, null, timestamp, (Object)key, (Object)value);
        for (Map.Entry<String, byte[]> entry : this.headerMap.entrySet()) {
            producerRecord.headers().add(entry.getKey(), entry.getValue());
        }
        return producerRecord;
    }

    public byte[] serializeKey(GenericRecord genericRecord) {
        ArrayList<String> primaryKeyValues = new ArrayList<String>();
        for (String primaryKey : this.primaryKeys) {
            primaryKeyValues.add(genericRecord.get(primaryKey).toString());
        }
        return String.join((CharSequence)";", primaryKeyValues).getBytes(StandardCharsets.UTF_8);
    }

    public byte[] serializeValue(GenericRecord genericRecord) {
        ReflectDatumWriter datumWriter = new ReflectDatumWriter(genericRecord.getSchema());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        byteArrayOutputStream.reset();
        BinaryEncoder binaryEncoder = new EncoderFactory().binaryEncoder((OutputStream)byteArrayOutputStream, null);
        try {
            datumWriter.write((Object)genericRecord, (Encoder)binaryEncoder);
            binaryEncoder.flush();
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        return byteArrayOutputStream.toByteArray();
    }
}

