package org.apache.flink.connector.file.sink.compactor.operator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
import org.apache.flink.connector.file.sink.compactor.FileCompactor;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
/* loaded from: input_file:org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.class */
public class CompactorOperator extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>> implements OneInputStreamOperator<CompactorRequest, CommittableMessage<FileSinkCommittable>>, BoundedOneInput, CheckpointListener {
    private static final long SUBMITTED_ID = -1;
    static final ListStateDescriptor<byte[]> REMAINING_REQUESTS_RAW_STATES_DESC;
    private final FileCompactStrategy strategy;
    private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer;
    private final FileCompactor fileCompactor;
    private final BucketWriter<?, String> bucketWriter;
    private transient CompactService compactService;
    private List<CompactorRequest> collectingRequests = new ArrayList();
    private final TreeMap<Long, List<CompactorRequest>> checkpointRequests = new TreeMap<>();
    private final List<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> compactingRequests = new LinkedList();
    private ListState<Map<Long, List<CompactorRequest>>> remainingRequestsState;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator$RemainingRequestsSerializer.class */
    static class RemainingRequestsSerializer implements SimpleVersionedSerializer<Map<Long, List<CompactorRequest>>> {
        private static final int MAGIC_NUMBER = -1454981501;
        private final CompactorRequestSerializer requestSerializer;

        /* JADX INFO: Access modifiers changed from: package-private */
        public RemainingRequestsSerializer(CompactorRequestSerializer compactorRequestSerializer) {
            this.requestSerializer = compactorRequestSerializer;
        }

        public int getVersion() {
            return 1;
        }

        public byte[] serialize(Map<Long, List<CompactorRequest>> map) throws IOException {
            DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256);
            dataOutputSerializer.writeInt(MAGIC_NUMBER);
            serializeV1(map, dataOutputSerializer);
            return dataOutputSerializer.getCopyOfBuffer();
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Map<Long, List<CompactorRequest>> m25deserialize(int i, byte[] bArr) throws IOException {
            DataInputDeserializer dataInputDeserializer = new DataInputDeserializer(bArr);
            switch (i) {
                case 1:
                    validateMagicNumber(dataInputDeserializer);
                    return deserializeV1(dataInputDeserializer);
                default:
                    throw new IOException("Unrecognized version or corrupt state: " + i);
            }
        }

        private void serializeV1(Map<Long, List<CompactorRequest>> map, DataOutputSerializer dataOutputSerializer) throws IOException {
            dataOutputSerializer.writeInt(map.size());
            for (Map.Entry<Long, List<CompactorRequest>> entry : map.entrySet()) {
                dataOutputSerializer.writeLong(entry.getKey().longValue());
                SimpleVersionedSerialization.writeVersionAndSerializeList(this.requestSerializer, entry.getValue(), dataOutputSerializer);
            }
        }

        private Map<Long, List<CompactorRequest>> deserializeV1(DataInputDeserializer dataInputDeserializer) throws IOException {
            int readInt = dataInputDeserializer.readInt();
            HashMap hashMap = new HashMap(readInt);
            for (int i = 0; i < readInt; i++) {
                hashMap.put(Long.valueOf(dataInputDeserializer.readLong()), SimpleVersionedSerialization.readVersionAndDeserializeList(this.requestSerializer, dataInputDeserializer));
            }
            return hashMap;
        }

        private static void validateMagicNumber(DataInputView dataInputView) throws IOException {
            int readInt = dataInputView.readInt();
            if (readInt != MAGIC_NUMBER) {
                throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", Integer.valueOf(readInt)));
            }
        }
    }

    public CompactorOperator(FileCompactStrategy fileCompactStrategy, SimpleVersionedSerializer<FileSinkCommittable> simpleVersionedSerializer, FileCompactor fileCompactor, BucketWriter<?, String> bucketWriter) {
        this.strategy = fileCompactStrategy;
        this.committableSerializer = simpleVersionedSerializer;
        this.fileCompactor = fileCompactor;
        this.bucketWriter = bucketWriter;
    }

    public void open() throws Exception {
        super.open();
        this.compactService = new CompactService(this.strategy.getNumCompactThreads(), this.fileCompactor, this.bucketWriter);
        this.compactService.open();
        submitUntil(-1L);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(StreamRecord<CompactorRequest> streamRecord) throws Exception {
        this.collectingRequests.add(streamRecord.getValue());
    }

    public void endInput() throws Exception {
        this.checkpointRequests.put(Long.MAX_VALUE, this.collectingRequests);
        this.collectingRequests = new ArrayList();
        submitUntil(Long.MAX_VALUE);
        if (!$assertionsDisabled && !this.checkpointRequests.isEmpty()) {
            throw new AssertionError();
        }
        getAllTasksFuture().join();
        emitCompacted(null);
        if (!$assertionsDisabled && !this.compactingRequests.isEmpty()) {
            throw new AssertionError();
        }
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        super.notifyCheckpointComplete(j);
        submitUntil(j);
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        emitCompacted(Long.valueOf(j));
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.checkpointRequests.put(Long.valueOf(stateSnapshotContext.getCheckpointId()), this.collectingRequests);
        this.collectingRequests = new ArrayList();
        HashMap hashMap = new HashMap(this.checkpointRequests);
        ((List) hashMap.computeIfAbsent(-1L, l -> {
            return new ArrayList();
        })).addAll((Collection) this.compactingRequests.stream().map(tuple2 -> {
            return (CompactorRequest) tuple2.f0;
        }).collect(Collectors.toList()));
        this.remainingRequestsState.update(Collections.singletonList(hashMap));
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.remainingRequestsState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(REMAINING_REQUESTS_RAW_STATES_DESC), new RemainingRequestsSerializer(new CompactorRequestSerializer(this.committableSerializer)));
        Iterable iterable = (Iterable) this.remainingRequestsState.get();
        if (iterable != null) {
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                for (Map.Entry entry : ((Map) it.next()).entrySet()) {
                    ((List) this.checkpointRequests.computeIfAbsent(entry.getKey(), l -> {
                        return new ArrayList();
                    })).addAll((Collection) entry.getValue());
                }
            }
        }
    }

    public void close() throws Exception {
        if (this.compactService != null) {
            this.compactService.close();
        }
    }

    private void submitUntil(long j) {
        NavigableMap<Long, List<CompactorRequest>> subMap = this.checkpointRequests.subMap(Long.MIN_VALUE, true, Long.valueOf(j), true);
        Iterator<Map.Entry<Long, List<CompactorRequest>>> it = subMap.entrySet().iterator();
        while (it.hasNext()) {
            for (CompactorRequest compactorRequest : it.next().getValue()) {
                CompletableFuture<Iterable<FileSinkCommittable>> completableFuture = new CompletableFuture<>();
                this.compactingRequests.add(new Tuple2<>(compactorRequest, completableFuture));
                this.compactService.submit(compactorRequest, completableFuture);
            }
        }
        subMap.clear();
    }

    private void emitCompacted(@Nullable Long l) throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> it = this.compactingRequests.iterator();
        while (it.hasNext()) {
            CompletableFuture completableFuture = (CompletableFuture) it.next().f1;
            if (completableFuture.isDone()) {
                it.remove();
                Iterator it2 = ((Iterable) completableFuture.get()).iterator();
                while (it2.hasNext()) {
                    arrayList.add((FileSinkCommittable) it2.next());
                }
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        this.output.collect(new StreamRecord(new CommittableSummary(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), l, arrayList.size(), arrayList.size(), 0)));
        Iterator it3 = arrayList.iterator();
        while (it3.hasNext()) {
            this.output.collect(new StreamRecord(new CommittableWithLineage((FileSinkCommittable) it3.next(), l, getRuntimeContext().getIndexOfThisSubtask())));
        }
    }

    @VisibleForTesting
    public CompletableFuture<?> getAllTasksFuture() {
        return CompletableFuture.allOf((CompletableFuture[]) this.compactingRequests.stream().map(tuple2 -> {
            return (CompletableFuture) tuple2.f1;
        }).toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    static {
        $assertionsDisabled = !CompactorOperator.class.desiredAssertionStatus();
        REMAINING_REQUESTS_RAW_STATES_DESC = new ListStateDescriptor<>("remaining_requests_raw_state", BytePrimitiveArraySerializer.INSTANCE);
    }
}
