/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.flink.engine;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.SchemaValidationException;
import org.apache.avro.SchemaValidatorBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;

public class PojoToAvroRecord<T>
extends RichMapFunction<T, GenericRecord>
implements ResultTypeQueryable<GenericRecord> {
    private final String schema;
    private final String encodedSchema;
    private final Map<String, String> complexFeatureSchemas;
    private transient Schema deserializedSchema;
    private transient Schema deserializedEncodedSchema;
    private transient Map<String, Schema> deserializedComplexFeatureSchemas;
    private transient GenericRecordAvroTypeInfo producedType;

    public PojoToAvroRecord(Schema schema, Schema encodedSchema, Map<String, String> complexFeatureSchemas) {
        this.schema = schema.toString();
        this.encodedSchema = encodedSchema.toString();
        this.complexFeatureSchemas = complexFeatureSchemas;
    }

    public GenericRecord map(T input) throws Exception {
        this.validatePojoAgainstSchema(input, this.deserializedSchema);
        GenericData.Record record = new GenericData.Record(this.deserializedEncodedSchema);
        List fields = Arrays.stream(input.getClass().getDeclaredFields()).filter(f -> f.getName().equals("SCHEMA$")).collect(Collectors.toList());
        if (!fields.isEmpty()) {
            Field schemaField = input.getClass().getDeclaredField("SCHEMA$");
            schemaField.setAccessible(true);
            Schema fieldSchema = (Schema)schemaField.get(null);
            for (Schema.Field field : fieldSchema.getFields()) {
                String fieldName = field.name();
                Field pojoField = input.getClass().getDeclaredField(fieldName);
                pojoField.setAccessible(true);
                Object fieldValue = pojoField.get(input);
                this.populateAvroRecord((GenericRecord)record, fieldName, fieldValue);
            }
        } else {
            for (Field field : fields) {
                String fieldName = field.getName();
                Object fieldValue = field.get(input);
                this.populateAvroRecord((GenericRecord)record, fieldName, fieldValue);
            }
        }
        return record;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.deserializedSchema = new Schema.Parser().parse(this.schema);
        this.deserializedEncodedSchema = new Schema.Parser().parse(this.encodedSchema);
        this.deserializedComplexFeatureSchemas = new HashMap<String, Schema>();
        for (String featureName : this.complexFeatureSchemas.keySet()) {
            this.deserializedComplexFeatureSchemas.put(featureName, new Schema.Parser().parse(this.complexFeatureSchemas.get(featureName)));
        }
        this.producedType = new GenericRecordAvroTypeInfo(this.deserializedEncodedSchema);
    }

    public TypeInformation<GenericRecord> getProducedType() {
        return this.producedType;
    }

    private void populateAvroRecord(GenericRecord record, String fieldName, Object fieldValue) throws IOException {
        if (this.deserializedComplexFeatureSchemas.containsKey(fieldName)) {
            GenericDatumWriter complexFeatureDatumWriter = new GenericDatumWriter(this.deserializedComplexFeatureSchemas.get(fieldName));
            ByteArrayOutputStream complexFeatureByteArrayOutputStream = new ByteArrayOutputStream();
            complexFeatureByteArrayOutputStream.reset();
            BinaryEncoder complexFeatureBinaryEncoder = new EncoderFactory().binaryEncoder((OutputStream)complexFeatureByteArrayOutputStream, null);
            complexFeatureDatumWriter.write(fieldValue, (Encoder)complexFeatureBinaryEncoder);
            complexFeatureBinaryEncoder.flush();
            record.put(fieldName, (Object)ByteBuffer.wrap(complexFeatureByteArrayOutputStream.toByteArray()));
            complexFeatureByteArrayOutputStream.flush();
            complexFeatureByteArrayOutputStream.close();
        } else {
            record.put(fieldName, fieldValue);
        }
    }

    private void validatePojoAgainstSchema(Object pojo, Schema avroSchema) throws SchemaValidationException {
        Schema pojoSchema = ReflectData.get().getSchema(pojo.getClass());
        SchemaValidatorBuilder builder = new SchemaValidatorBuilder();
        builder.canReadStrategy().validateAll().validate(avroSchema, Collections.singletonList(pojoSchema));
    }
}

