package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.class */
class SubtaskCommittableManagerTest {
    SubtaskCommittableManagerTest() {
    }

    @Test
    void testDrainCommittables() {
        SubtaskCommittableManager subtaskCommittableManager = new SubtaskCommittableManager(3, 1, 1L);
        CommittableWithLineage committableWithLineage = new CommittableWithLineage(1, 1L, 1);
        CommittableWithLineage committableWithLineage2 = new CommittableWithLineage(2, 1L, 1);
        CommittableWithLineage committableWithLineage3 = new CommittableWithLineage(3, 1L, 1);
        Assertions.assertThat(subtaskCommittableManager.getPendingRequests()).hasSize(0);
        subtaskCommittableManager.add(committableWithLineage);
        subtaskCommittableManager.add(committableWithLineage2);
        subtaskCommittableManager.add(committableWithLineage3);
        Assertions.assertThat(subtaskCommittableManager.getPendingRequests()).hasSize(3);
        Assertions.assertThat(subtaskCommittableManager.getNumCommittables()).isEqualTo(3);
        Assertions.assertThat(subtaskCommittableManager.getNumDrained()).isEqualTo(0);
        Assertions.assertThat(subtaskCommittableManager.isFinished()).isFalse();
        Iterator it = subtaskCommittableManager.getRequests().iterator();
        IntStream.range(0, 2).forEach(i -> {
            ((CommitRequestImpl) it.next()).setCommittedIfNoError();
        });
        Assertions.assertThat(subtaskCommittableManager.getPendingRequests()).hasSize(1);
        Assertions.assertThat(subtaskCommittableManager.getNumCommittables()).isEqualTo(3);
        Assertions.assertThat(subtaskCommittableManager.getNumDrained()).isEqualTo(0);
        List drainCommitted = subtaskCommittableManager.drainCommitted();
        Assertions.assertThat(drainCommitted).hasSize(2);
        Assertions.assertThat(drainCommitted.get(0)).satisfies(new ThrowingConsumer[]{committableWithLineage4 -> {
            Assertions.assertThat(committableWithLineage4.getSubtaskId()).isEqualTo(1);
            Assertions.assertThat((Integer) committableWithLineage4.getCommittable()).isEqualTo(1);
            Assertions.assertThat(committableWithLineage4.getCheckpointId()).hasValue(1L);
        }});
        Assertions.assertThat(drainCommitted.get(1)).satisfies(new ThrowingConsumer[]{committableWithLineage5 -> {
            Assertions.assertThat(committableWithLineage5.getSubtaskId()).isEqualTo(1);
            Assertions.assertThat((Integer) committableWithLineage5.getCommittable()).isEqualTo(2);
            Assertions.assertThat(committableWithLineage5.getCheckpointId()).hasValue(1L);
        }});
        Assertions.assertThat(subtaskCommittableManager.getNumFailed()).isEqualTo(0);
        Assertions.assertThat(subtaskCommittableManager.drainCommitted()).hasSize(0);
        ((CommitRequestImpl) it.next()).signalFailedWithKnownReason(new RuntimeException("Unused exception"));
        Assertions.assertThat(subtaskCommittableManager.getNumFailed()).isEqualTo(0);
        Assertions.assertThat(subtaskCommittableManager.getPendingRequests()).hasSize(0);
        Assertions.assertThat(subtaskCommittableManager.getNumCommittables()).isEqualTo(3);
        Assertions.assertThat(subtaskCommittableManager.isFinished()).isFalse();
        Assertions.assertThat(subtaskCommittableManager.drainCommitted()).hasSize(0);
        Assertions.assertThat(subtaskCommittableManager.getNumFailed()).isEqualTo(1);
        Assertions.assertThat(subtaskCommittableManager.isFinished()).isTrue();
    }

    @Test
    void testMerge() {
        SubtaskCommittableManager subtaskCommittableManager = new SubtaskCommittableManager(Collections.singletonList(new CommitRequestImpl(1)), 5, 1, 2, 1, 2L);
        subtaskCommittableManager.merge(new SubtaskCommittableManager(Arrays.asList(new CommitRequestImpl(2), new CommitRequestImpl(3)), 10, 2, 3, 1, 2L));
        Assertions.assertThat(subtaskCommittableManager.getNumCommittables()).isEqualTo(11);
        Assertions.assertThat(subtaskCommittableManager.getNumDrained()).isEqualTo(3);
        Assertions.assertThat(subtaskCommittableManager.isFinished()).isFalse();
        Assertions.assertThat(subtaskCommittableManager.getNumFailed()).isEqualTo(5);
        Assertions.assertThat(subtaskCommittableManager.getPendingRequests()).hasSize(3);
    }
}
