package org.apache.hudi.sink.bucket;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.clustering.update.strategy.FlinkConsistentBucketUpdateStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.class */
public class ConsistentBucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
    private static final Logger LOG = LoggerFactory.getLogger(ConsistentBucketStreamWriteFunction.class);
    private transient FlinkConsistentBucketUpdateStrategy updateStrategy;

    public ConsistentBucketStreamWriteFunction(Configuration configuration) {
        super(configuration);
    }

    @Override // org.apache.hudi.sink.StreamWriteFunction
    public void open(Configuration configuration) throws IOException {
        super.open(configuration);
        this.updateStrategy = new FlinkConsistentBucketUpdateStrategy(this.writeClient, Arrays.asList(this.config.getString(FlinkOptions.INDEX_KEY_FIELD).split(",")));
    }

    @Override // org.apache.hudi.sink.StreamWriteFunction, org.apache.hudi.sink.common.AbstractStreamWriteFunction
    public void snapshotState() {
        super.snapshotState();
        this.updateStrategy.reset();
    }

    @Override // org.apache.hudi.sink.StreamWriteFunction
    protected List<WriteStatus> writeBucket(String str, StreamWriteFunction.DataBucket dataBucket, List<HoodieRecord> list) {
        this.updateStrategy.initialize(this.writeClient);
        dataBucket.preWrite(list);
        return (List) ((List) this.updateStrategy.handleUpdate(Collections.singletonList(Pair.of(list, str))).getKey()).stream().flatMap(pair -> {
            return ((List) this.writeFunction.apply(pair.getLeft(), pair.getRight())).stream();
        }).collect(Collectors.toList());
    }
}
