package org.apache.flink.connector.file.table.batch.compact;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.connector.file.table.BinPacking;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.function.SupplierWithException;

/* loaded from: input_file:org/apache/flink/connector/file/table/batch/compact/BatchCompactCoordinator.class */
public class BatchCompactCoordinator extends AbstractStreamOperator<CompactMessages.CoordinatorOutput> implements OneInputStreamOperator<CompactMessages.CoordinatorInput, CompactMessages.CoordinatorOutput>, BoundedOneInput {
    private static final long serialVersionUID = 1;
    private final SupplierWithException<FileSystem, IOException> fsFactory;
    private final long compactAverageSize;
    private final long compactTargetSize;
    private transient FileSystem fs;
    private transient Map<String, List<Path>> inputFiles;
    private transient StreamRecord<CompactMessages.CoordinatorOutput> element;

    public BatchCompactCoordinator(SupplierWithException<FileSystem, IOException> supplierWithException, long j, long j2) {
        this.fsFactory = supplierWithException;
        this.compactAverageSize = j;
        this.compactTargetSize = j2;
    }

    public void open() throws Exception {
        this.fs = (FileSystem) this.fsFactory.get();
        this.inputFiles = new HashMap();
        this.element = new StreamRecord<>((Object) null);
    }

    public void processElement(StreamRecord<CompactMessages.CoordinatorInput> streamRecord) throws Exception {
        CompactMessages.CoordinatorInput coordinatorInput = (CompactMessages.CoordinatorInput) streamRecord.getValue();
        if (!(coordinatorInput instanceof CompactMessages.InputFile)) {
            throw new UnsupportedOperationException("Unsupported input message: " + coordinatorInput);
        }
        CompactMessages.InputFile inputFile = (CompactMessages.InputFile) coordinatorInput;
        this.inputFiles.computeIfAbsent(inputFile.getPartition(), str -> {
            return new ArrayList();
        }).add(inputFile.getFile());
    }

    public void endInput() throws Exception {
        for (Map.Entry<String, List<Path>> entry : this.inputFiles.entrySet()) {
            compactPartitionFiles(entry.getKey(), entry.getValue());
        }
    }

    public void close() throws Exception {
        this.inputFiles.clear();
    }

    private void compactPartitionFiles(String str, List<Path> list) throws IOException {
        if (list.isEmpty()) {
            return;
        }
        int i = 0;
        Map<Path, Long> filesSize = getFilesSize(this.fs, list);
        if (getAverageSize(filesSize) >= this.compactAverageSize) {
            Iterator<Path> it = list.iterator();
            while (it.hasNext()) {
                int i2 = i;
                i++;
                this.output.collect(this.element.replace(new CompactMessages.CompactionUnit(i2, str, Collections.singletonList(it.next()))));
            }
            return;
        }
        filesSize.getClass();
        Iterator it2 = BinPacking.pack(list, (v1) -> {
            return r0.get(v1);
        }, this.compactTargetSize).iterator();
        while (it2.hasNext()) {
            int i3 = i;
            i++;
            this.output.collect(this.element.replace(new CompactMessages.CompactionUnit(i3, str, (List) it2.next())));
        }
    }

    private Map<Path, Long> getFilesSize(FileSystem fileSystem, List<Path> list) throws IOException {
        HashMap hashMap = new HashMap();
        for (Path path : list) {
            hashMap.put(path, Long.valueOf(fileSystem.getFileStatus(path).getLen()));
        }
        return hashMap;
    }

    private double getAverageSize(Map<Path, Long> map) {
        int i = 0;
        long j = 0;
        Iterator<Map.Entry<Path, Long>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            i++;
            j += it.next().getValue().longValue();
        }
        return j / (i * 1.0d);
    }
}
