/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
import org.slf4j.Logger;

public class TestStreamWriteOperatorCoordinator {
    private StreamWriteOperatorCoordinator coordinator;
    @TempDir
    File tempFile;

    @BeforeEach
    public void before() throws Exception {
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 2);
        this.coordinator = new StreamWriteOperatorCoordinator(TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath()), (OperatorCoordinator.Context)context);
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)context));
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)WriteMetadataEvent.emptyBootstrap((int)0));
        this.coordinator.handleEventFromOperator(1, (OperatorEvent)WriteMetadataEvent.emptyBootstrap((int)1));
    }

    @AfterEach
    public void after() throws Exception {
        this.coordinator.close();
    }

    @Test
    void testInstantState() {
        String instant = this.coordinator.getInstant();
        Assertions.assertNotEquals((Object)"", (Object)instant);
        WriteMetadataEvent event0 = TestStreamWriteOperatorCoordinator.createOperatorEvent(0, instant, "par1", true, 0.1);
        WriteMetadataEvent event1 = TestStreamWriteOperatorCoordinator.createOperatorEvent(1, instant, "par2", false, 0.2);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event0);
        this.coordinator.handleEventFromOperator(1, (OperatorEvent)event1);
        this.coordinator.notifyCheckpointComplete(1L);
        String inflight = TestUtils.getLastPendingInstant(this.tempFile.getAbsolutePath());
        String lastCompleted = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
        MatcherAssert.assertThat((String)"Instant should be complete", (Object)lastCompleted, (Matcher)CoreMatchers.is((Object)instant));
        Assertions.assertNotEquals((Object)"", (Object)inflight, (String)"Should start a new instant");
        Assertions.assertNotEquals((Object)instant, (Object)inflight, (String)"Should start a new instant");
    }

    @Test
    public void testTableInitialized() throws IOException {
        org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf((Configuration)new Configuration());
        String basePath = this.tempFile.getAbsolutePath();
        try (FileSystem fs = HadoopFSUtils.getFs((String)basePath, (org.apache.hadoop.conf.Configuration)hadoopConf);){
            Assertions.assertTrue((boolean)fs.exists(new Path(basePath, ".hoodie")));
        }
    }

    @Test
    public void testCheckpointAndRestore() throws Exception {
        CompletableFuture future = new CompletableFuture();
        this.coordinator.checkpointCoordinator(1L, future);
        this.coordinator.resetToCheckpoint(1L, (byte[])future.get());
    }

    @Test
    public void testReceiveInvalidEvent() {
        CompletableFuture future = new CompletableFuture();
        this.coordinator.checkpointCoordinator(1L, future);
        WriteMetadataEvent event = WriteMetadataEvent.builder().taskID(0).instantTime("abc").writeStatus(Collections.emptyList()).build();
        this.assertError(() -> this.lambda$testReceiveInvalidEvent$0((OperatorEvent)event), "Receive an unexpected event for instant abc from task 0");
    }

    @Test
    public void testEventReset() {
        CompletableFuture future = new CompletableFuture();
        this.coordinator.checkpointCoordinator(1L, future);
        WriteMetadataEvent event1 = WriteMetadataEvent.builder().taskID(0).instantTime("001").writeStatus(Collections.emptyList()).build();
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event1);
        this.coordinator.subtaskFailed(0, null);
        Assertions.assertNotNull((Object)this.coordinator.getEventBuffer()[0], (String)"Events should not be cleared by subTask failure");
        WriteMetadataEvent event2 = TestStreamWriteOperatorCoordinator.createOperatorEvent(0, "001", "par1", false, 0.1);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event2);
        this.coordinator.subtaskFailed(0, null);
        Assertions.assertNotNull((Object)this.coordinator.getEventBuffer()[0], (String)"Events should not be cleared by subTask failure");
        WriteMetadataEvent event3 = TestStreamWriteOperatorCoordinator.createOperatorEvent(0, "001", "par1", false, 0.1);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event3);
        MatcherAssert.assertThat((String)"Multiple events of same instant should be merged", (Object)this.coordinator.getEventBuffer()[0].getWriteStatuses().size(), (Matcher)CoreMatchers.is((Object)2));
        WriteMetadataEvent event4 = TestStreamWriteOperatorCoordinator.createOperatorEvent(0, "002", "par1", false, 0.1);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event4);
        MatcherAssert.assertThat((String)"The new event should override the old event", (Object)this.coordinator.getEventBuffer()[0].getWriteStatuses().size(), (Matcher)CoreMatchers.is((Object)1));
    }

    @Test
    public void testCheckpointCompleteWithPartialEvents() throws Exception {
        this.reset();
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        conf.setBoolean(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 2);
        this.coordinator = new StreamWriteOperatorCoordinator(conf, (OperatorCoordinator.Context)context);
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)context));
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)WriteMetadataEvent.emptyBootstrap((int)0));
        this.coordinator.handleEventFromOperator(1, (OperatorEvent)WriteMetadataEvent.emptyBootstrap((int)1));
        CompletableFuture future = new CompletableFuture();
        this.coordinator.checkpointCoordinator(1L, future);
        String instant = this.coordinator.getInstant();
        WriteMetadataEvent event = WriteMetadataEvent.builder().taskID(0).instantTime(instant).writeStatus(Collections.emptyList()).build();
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event);
        Assertions.assertDoesNotThrow(() -> this.coordinator.notifyCheckpointComplete(1L), (String)"Returns early for empty write results");
        String lastCompleted = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
        Assertions.assertNull((Object)lastCompleted, (String)"Returns early for empty write results");
        Assertions.assertNull((Object)this.coordinator.getEventBuffer()[0]);
        WriteMetadataEvent event1 = TestStreamWriteOperatorCoordinator.createOperatorEvent(1, instant, "par2", false, 0.2);
        this.coordinator.handleEventFromOperator(1, (OperatorEvent)event1);
        Assertions.assertDoesNotThrow(() -> this.coordinator.notifyCheckpointComplete(2L), (String)"Commits the instant with partial events anyway");
        lastCompleted = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
        MatcherAssert.assertThat((String)"Commits the instant with partial events anyway", (Object)lastCompleted, (Matcher)CoreMatchers.is((Object)instant));
    }

    @Test
    public void testRecommitWithPartialUncommittedEvents() {
        CompletableFuture future = new CompletableFuture();
        this.coordinator.checkpointCoordinator(1L, future);
        String instant = this.coordinator.getInstant();
        String lastCompleted = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
        Assertions.assertNull((Object)lastCompleted, (String)"Returns early for empty write results");
        WriteMetadataEvent event1 = TestStreamWriteOperatorCoordinator.createOperatorEvent(0, instant, "par1", false, 0.2);
        event1.setBootstrap(true);
        WriteMetadataEvent event2 = WriteMetadataEvent.emptyBootstrap((int)1);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event1);
        this.coordinator.handleEventFromOperator(1, (OperatorEvent)event2);
        lastCompleted = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
        MatcherAssert.assertThat((String)"Recommits the instant with partial uncommitted events", (Object)lastCompleted, (Matcher)CoreMatchers.is((Object)instant));
    }

    @Test
    public void testStopHeartbeatForUncommittedEventWithLazyCleanPolicy() throws Exception {
        this.reset();
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        conf.setString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.LAZY.name());
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        this.coordinator = new StreamWriteOperatorCoordinator(conf, (OperatorCoordinator.Context)context);
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)context));
        Assertions.assertTrue((boolean)this.coordinator.getWriteClient().getConfig().getFailedWritesCleanPolicy().isLazy());
        WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap((int)0);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event0);
        String instant = this.coordinator.getInstant();
        HoodieHeartbeatClient heartbeatClient = this.coordinator.getWriteClient().getHeartbeatClient();
        Assertions.assertNotNull((Object)heartbeatClient.getHeartbeat(instant), (String)"Heartbeat is missing");
        String basePath = this.tempFile.getAbsolutePath();
        HoodieStorage storage = this.coordinator.getWriteClient().getHoodieTable().getStorage();
        Assertions.assertTrue((boolean)HoodieHeartbeatClient.heartbeatExists((HoodieStorage)storage, (String)basePath, (String)instant), (String)"Heartbeat is existed");
        WriteMetadataEvent event1 = WriteMetadataEvent.emptyBootstrap((int)0);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event1);
        Assertions.assertFalse((boolean)HoodieHeartbeatClient.heartbeatExists((HoodieStorage)storage, (String)basePath, (String)instant), (String)"Heartbeat is stopped and cleared");
    }

    @Test
    public void testRecommitWithLazyFailedWritesCleanPolicy() {
        this.coordinator.getWriteClient().getConfig().setValue(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY, HoodieFailedWritesCleaningPolicy.LAZY.name());
        Assertions.assertTrue((boolean)this.coordinator.getWriteClient().getConfig().getFailedWritesCleanPolicy().isLazy());
        CompletableFuture future = new CompletableFuture();
        this.coordinator.checkpointCoordinator(1L, future);
        String instant = this.coordinator.getInstant();
        WriteMetadataEvent event1 = TestStreamWriteOperatorCoordinator.createOperatorEvent(0, instant, "par1", false, 0.2);
        event1.setBootstrap(true);
        WriteMetadataEvent event2 = WriteMetadataEvent.emptyBootstrap((int)1);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event1);
        this.coordinator.handleEventFromOperator(1, (OperatorEvent)event2);
        MatcherAssert.assertThat((String)"Recommits the instant with lazy failed writes clean policy", (Object)TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath()), (Matcher)CoreMatchers.is((Object)instant));
    }

    @Test
    public void testHiveSyncInvoked() throws Exception {
        this.reset();
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        conf.setBoolean(FlinkOptions.HIVE_SYNC_ENABLED, true);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        this.coordinator = new StreamWriteOperatorCoordinator(conf, (OperatorCoordinator.Context)context);
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)context));
        WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap((int)0);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event0);
        String instant = this.mockWriteWithMetadata();
        Assertions.assertNotEquals((Object)"", (Object)instant);
        Assertions.assertDoesNotThrow(() -> this.coordinator.notifyCheckpointComplete(1L));
    }

    @Test
    void testSyncMetadataTable() throws Exception {
        int numCommits;
        this.reset();
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        int metadataCompactionDeltaCommits = 5;
        conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
        conf.setInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, metadataCompactionDeltaCommits);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        this.coordinator = new StreamWriteOperatorCoordinator(conf, (OperatorCoordinator.Context)context);
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)context));
        WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap((int)0);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event0);
        String instant = this.coordinator.getInstant();
        Assertions.assertNotEquals((Object)"", (Object)instant);
        String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath((String)this.tempFile.getAbsolutePath());
        HoodieTableMetaClient metadataTableMetaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf((Configuration)conf)), (String)metadataTableBasePath);
        HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
        HoodieTableMetaClient dataTableMetaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf((Configuration)conf)), (String)new Path(metadataTableBasePath).getParent().getParent().toString());
        int metadataPartitions = dataTableMetaClient.getTableConfig().getMetadataPartitions().size();
        MatcherAssert.assertThat((String)"Instants needed to sync to metadata table do not match", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)metadataPartitions));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.lastInstant().get()).requestedTime(), (Matcher)CoreMatchers.startsWith((String)"00000000000000"));
        for (numCommits = metadataPartitions; numCommits < metadataCompactionDeltaCommits; ++numCommits) {
            instant = this.mockWriteWithMetadata();
            metadataTableMetaClient.reloadActiveTimeline();
            completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
            MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)(numCommits + 1)));
            MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.lastInstant().get()).requestedTime(), (Matcher)CoreMatchers.is((Object)instant));
        }
        this.mockWriteWithMetadata();
        metadataTableMetaClient.reloadActiveTimeline();
        completedTimeline = metadataTableMetaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
        MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)(numCommits + 2)));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.nthFromLastInstant(0).get()).getAction(), (Matcher)CoreMatchers.is((Object)"commit"));
        for (int i = 7; i < 8; ++i) {
            instant = this.mockWriteWithMetadata();
            metadataTableMetaClient.reloadActiveTimeline();
            completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
            MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)(i + 1)));
            MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.lastInstant().get()).requestedTime(), (Matcher)CoreMatchers.is((Object)instant));
        }
        instant = this.mockWriteWithMetadata();
        metadataTableMetaClient.reloadActiveTimeline();
        completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
        MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)9));
        this.mockWriteWithMetadata();
        this.mockWriteWithMetadata();
        this.mockWriteWithMetadata();
        this.mockWriteWithMetadata();
        metadataTableMetaClient.reloadActiveTimeline();
        completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
        MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)14));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.nthFromLastInstant(1).get()).getAction(), (Matcher)CoreMatchers.is((Object)"commit"));
    }

    @Test
    void testSyncMetadataTableWithLogCompaction() throws Exception {
        int numCommits;
        this.reset();
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
        conf.setInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 20);
        conf.setString("hoodie.metadata.log.compaction.enable", "true");
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        this.coordinator = new StreamWriteOperatorCoordinator(conf, (OperatorCoordinator.Context)context);
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)context));
        WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap((int)0);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event0);
        String instant = this.coordinator.getInstant();
        Assertions.assertNotEquals((Object)"", (Object)instant);
        String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath((String)this.tempFile.getAbsolutePath());
        HoodieTableMetaClient metadataTableMetaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf((Configuration)conf)), (String)metadataTableBasePath);
        HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
        HoodieTableMetaClient dataTableMetaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf((Configuration)conf)), (String)new Path(metadataTableBasePath).getParent().getParent().toString());
        int metadataPartitions = dataTableMetaClient.getTableConfig().getMetadataPartitions().size();
        MatcherAssert.assertThat((String)"Instants needed to sync to metadata table do not match", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)metadataPartitions));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.lastInstant().get()).requestedTime(), (Matcher)CoreMatchers.startsWith((String)"00000000000000"));
        for (numCommits = metadataPartitions; numCommits < metadataPartitions + 4; ++numCommits) {
            instant = this.mockWriteWithMetadata();
            metadataTableMetaClient.reloadActiveTimeline();
            completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
            MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)(numCommits + 1)));
            MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.lastInstant().get()).requestedTime(), (Matcher)CoreMatchers.is((Object)instant));
        }
        this.mockWriteWithMetadata();
        metadataTableMetaClient.reloadActiveTimeline();
        completedTimeline = metadataTableMetaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
        MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)(numCommits + 2)));
        MatcherAssert.assertThat((String)"The log compaction instant time should be new generated", (Object)((HoodieInstant)completedTimeline.nthFromLastInstant(1).get()).requestedTime(), (Matcher)CoreMatchers.not((Object)instant));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.nthFromLastInstant(1).get()).getAction(), (Matcher)CoreMatchers.is((Object)"deltacommit"));
    }

    @Test
    void testSyncMetadataTableWithRollback() throws Exception {
        this.reset();
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        this.coordinator = new StreamWriteOperatorCoordinator(conf, (OperatorCoordinator.Context)context);
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)context));
        WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap((int)0);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event0);
        String instant = this.coordinator.getInstant();
        Assertions.assertNotEquals((Object)"", (Object)instant);
        String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath((String)this.tempFile.getAbsolutePath());
        HoodieTableMetaClient metadataTableMetaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf((Configuration)conf)), (String)metadataTableBasePath);
        HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
        HoodieTableMetaClient dataTableMetaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf((Configuration)conf)), (String)new Path(metadataTableBasePath).getParent().getParent().toString());
        int metadataPartitions = dataTableMetaClient.getTableConfig().getMetadataPartitions().size();
        MatcherAssert.assertThat((String)"Instants needed to sync to metadata table do not match", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)metadataPartitions));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.lastInstant().get()).requestedTime(), (Matcher)CoreMatchers.startsWith((String)"00000000000000"));
        this.mockWriteWithMetadata();
        instant = this.coordinator.getInstant();
        metadataTableMetaClient.getActiveTimeline().createNewInstant(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, "deltacommit", instant));
        metadataTableMetaClient.getActiveTimeline().transitionRequestedToInflight("deltacommit", instant);
        metadataTableMetaClient.reloadActiveTimeline();
        this.mockWriteWithMetadata();
        metadataTableMetaClient.reloadActiveTimeline();
        completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
        MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)(metadataPartitions + 3)));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.nthFromLastInstant(1).get()).requestedTime(), (Matcher)CoreMatchers.is((Object)instant));
        MatcherAssert.assertThat((String)"The pending instant should be rolled back first", (Object)((HoodieInstant)completedTimeline.lastInstant().get()).getAction(), (Matcher)CoreMatchers.is((Object)"rollback"));
    }

    @Test
    public void testEndInputIsTheLastEvent() throws Exception {
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        Logger logger = (Logger)Mockito.mock(Logger.class);
        NonThrownExecutor executor = NonThrownExecutor.builder((Logger)logger).waitForTasksFinish(true).build();
        try (StreamWriteOperatorCoordinator coordinator = new StreamWriteOperatorCoordinator(conf, (OperatorCoordinator.Context)context);){
            coordinator.start();
            coordinator.setExecutor(executor);
            coordinator.handleEventFromOperator(0, (OperatorEvent)WriteMetadataEvent.emptyBootstrap((int)0));
            TimeUnit.SECONDS.sleep(5L);
            int eventCount = 20000;
            for (int i = 0; i < eventCount; ++i) {
                coordinator.handleEventFromOperator(0, (OperatorEvent)TestStreamWriteOperatorCoordinator.createOperatorEvent(0, coordinator.getInstant(), "par1", true, 0.1));
            }
            WriteMetadataEvent endInput = WriteMetadataEvent.builder().taskID(0).instantTime(coordinator.getInstant()).writeStatus(Collections.emptyList()).endInput(true).build();
            coordinator.handleEventFromOperator(0, (OperatorEvent)endInput);
            executor.close();
            Assertions.assertNull((Object)coordinator.getEventBuffer()[0]);
        }
    }

    @Test
    void testLockForMetadataTable() throws Exception {
        this.reset();
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
        conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
        conf.setInteger("hoodie.write.lock.client.num_retries", 1);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        this.coordinator = new StreamWriteOperatorCoordinator(conf, (OperatorCoordinator.Context)context);
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)context));
        WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap((int)0);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event0);
        String instant = this.coordinator.getInstant();
        Assertions.assertNotEquals((Object)"", (Object)instant);
        String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath((String)this.tempFile.getAbsolutePath());
        HoodieTableMetaClient metadataTableMetaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf((Configuration)conf)), (String)metadataTableBasePath);
        HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
        HoodieTableMetaClient dataTableMetaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf((Configuration)conf)), (String)new Path(metadataTableBasePath).getParent().getParent().toString());
        int metadataPartitions = dataTableMetaClient.getTableConfig().getMetadataPartitions().size();
        MatcherAssert.assertThat((String)"Instants needed to sync to metadata table do not match", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)metadataPartitions));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.lastInstant().get()).requestedTime(), (Matcher)CoreMatchers.startsWith((String)"00000000000000"));
        instant = this.mockWriteWithMetadata();
        metadataTableMetaClient.reloadActiveTimeline();
        completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
        MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)(metadataPartitions + 1)));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.lastInstant().get()).requestedTime(), (Matcher)CoreMatchers.is((Object)instant));
    }

    @Test
    public void testCommitOnEmptyBatch() throws Exception {
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        conf.setBoolean(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), true);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 2);
        MockCoordinatorExecutor executor = new MockCoordinatorExecutor((OperatorCoordinator.Context)context);
        try (StreamWriteOperatorCoordinator coordinator = new StreamWriteOperatorCoordinator(conf, (OperatorCoordinator.Context)context);){
            coordinator.start();
            coordinator.setExecutor((NonThrownExecutor)executor);
            coordinator.handleEventFromOperator(0, (OperatorEvent)WriteMetadataEvent.emptyBootstrap((int)0));
            coordinator.handleEventFromOperator(1, (OperatorEvent)WriteMetadataEvent.emptyBootstrap((int)1));
            String instant = coordinator.getInstant();
            WriteMetadataEvent event1 = WriteMetadataEvent.builder().taskID(0).instantTime(instant).writeStatus(Collections.emptyList()).lastBatch(true).build();
            WriteMetadataEvent event2 = WriteMetadataEvent.builder().taskID(1).instantTime(instant).writeStatus(Collections.emptyList()).lastBatch(true).build();
            coordinator.handleEventFromOperator(0, (OperatorEvent)event1);
            coordinator.handleEventFromOperator(1, (OperatorEvent)event2);
            Assertions.assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1L), (String)"Commit the instant");
            String lastCompleted = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
            MatcherAssert.assertThat((String)"Commits the instant with empty batch anyway", (Object)lastCompleted, (Matcher)CoreMatchers.is((Object)instant));
            Assertions.assertNull((Object)coordinator.getEventBuffer()[0]);
        }
    }

    private String mockWriteWithMetadata() {
        String instant = this.coordinator.getInstant();
        WriteMetadataEvent event = TestStreamWriteOperatorCoordinator.createOperatorEvent(0, instant, "par1", true, 0.1);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event);
        this.coordinator.notifyCheckpointComplete(0L);
        return instant;
    }

    private static WriteMetadataEvent createOperatorEvent(int taskId, String instant, String partitionPath, boolean trackSuccessRecords, double failureFraction) {
        WriteStatus writeStatus = new WriteStatus(Boolean.valueOf(trackSuccessRecords), Double.valueOf(failureFraction));
        writeStatus.setPartitionPath(partitionPath);
        HoodieWriteStat writeStat = new HoodieWriteStat();
        writeStat.setPartitionPath(partitionPath);
        writeStat.setFileId("fileId123");
        writeStat.setPath("path123");
        writeStat.setFileSizeInBytes(123L);
        writeStat.setTotalWriteBytes(123L);
        writeStat.setNumWrites(1L);
        writeStatus.setStat(writeStat);
        return WriteMetadataEvent.builder().taskID(taskId).instantTime(instant).writeStatus(Collections.singletonList(writeStatus)).lastBatch(true).build();
    }

    private void reset() throws Exception {
        FileUtils.cleanDirectory((File)this.tempFile);
    }

    private void assertError(Runnable runnable, String message) {
        runnable.run();
        MatcherAssert.assertThat((Object)this.coordinator.getContext(), (Matcher)CoreMatchers.instanceOf(MockOperatorCoordinatorContext.class));
        MockOperatorCoordinatorContext context = (MockOperatorCoordinatorContext)this.coordinator.getContext();
        Assertions.assertTrue((boolean)context.isJobFailed(), (String)message);
    }

    private /* synthetic */ void lambda$testReceiveInvalidEvent$0(OperatorEvent event) {
        this.coordinator.handleEventFromOperator(0, event);
    }
}

