package com.logicalclocks.hsfs.flink.engine;

import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.flink.StreamFeatureGroup;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.class */
public class KafkaRecordSerializer implements KafkaRecordSerializationSchema<GenericRecord> {
    private final String topic;
    private final List<String> primaryKeys;
    private final Map<String, byte[]> headerMap = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaRecordSerializer(StreamFeatureGroup streamFeatureGroup) throws FeatureStoreException, IOException {
        this.topic = streamFeatureGroup.getOnlineTopicName();
        this.primaryKeys = streamFeatureGroup.getPrimaryKeys();
        this.headerMap.put("projectId", String.valueOf(streamFeatureGroup.getFeatureStore().getProjectId()).getBytes(StandardCharsets.UTF_8));
        this.headerMap.put("featureGroupId", String.valueOf(streamFeatureGroup.getId()).getBytes(StandardCharsets.UTF_8));
        this.headerMap.put("subjectId", String.valueOf(streamFeatureGroup.getSubject().getId()).getBytes(StandardCharsets.UTF_8));
    }

    public void open(SerializationSchema.InitializationContext initializationContext, KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext) {
    }

    public ProducerRecord<byte[], byte[]> serialize(GenericRecord genericRecord, KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext, Long l) {
        ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(this.topic, (Integer) null, l, serializeKey(genericRecord), serializeValue(genericRecord));
        for (Map.Entry<String, byte[]> entry : this.headerMap.entrySet()) {
            producerRecord.headers().add(entry.getKey(), entry.getValue());
        }
        return producerRecord;
    }

    public byte[] serializeKey(GenericRecord genericRecord) {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = this.primaryKeys.iterator();
        while (it.hasNext()) {
            arrayList.add(genericRecord.get(it.next()).toString());
        }
        return String.join(";", arrayList).getBytes(StandardCharsets.UTF_8);
    }

    public byte[] serializeValue(GenericRecord genericRecord) {
        ReflectDatumWriter reflectDatumWriter = new ReflectDatumWriter(genericRecord.getSchema());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        byteArrayOutputStream.reset();
        BinaryEncoder binaryEncoder = new EncoderFactory().binaryEncoder(byteArrayOutputStream, null);
        try {
            reflectDatumWriter.write(genericRecord, binaryEncoder);
            binaryEncoder.flush();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return byteArrayOutputStream.toByteArray();
    }
}
