/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.transaction.lock.models;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.hudi.client.transaction.lock.models.HeartbeatManager;
import org.apache.hudi.client.transaction.lock.models.LockProviderHeartbeatManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;

public class TestLockProviderHeartbeatManager {
    private ScheduledExecutorService mockScheduler;
    private Logger mockLogger;
    private ScheduledFuture<?> mockFuture;
    private HeartbeatManager manager;
    private static final String LOGGER_ID = "test-owner";
    private ScheduledExecutorService actualExecutorService;

    @BeforeEach
    void setUp() {
        this.mockScheduler = (ScheduledExecutorService)Mockito.mock(ScheduledExecutorService.class);
        this.mockLogger = (Logger)Mockito.mock(Logger.class);
        this.mockFuture = (ScheduledFuture)Mockito.mock(ScheduledFuture.class);
        this.actualExecutorService = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread t = new Thread(r, "Heartbeat-Test-Thread");
            t.setDaemon(true);
            return t;
        });
    }

    @AfterEach
    void tearDown() throws Exception {
        if (this.manager != null) {
            this.manager.close();
            this.manager = null;
        }
        this.actualExecutorService.shutdownNow();
    }

    @Test
    void testStartHeartbeatSuccess() {
        Mockito.when(this.mockScheduler.scheduleAtFixedRate((Runnable)ArgumentMatchers.any(Runnable.class), ArgumentMatchers.eq((long)100L), ArgumentMatchers.eq((long)100L), (TimeUnit)((Object)ArgumentMatchers.eq((Object)((Object)TimeUnit.MILLISECONDS))))).thenAnswer(invocation -> this.mockFuture);
        this.manager = this.createDefaultManagerWithMocks(() -> true);
        Assertions.assertTrue((boolean)this.manager.startHeartbeatForThread(Thread.currentThread()));
    }

    @Test
    void testStartHeartbeatAlreadyRunning() {
        Mockito.when(this.mockScheduler.scheduleAtFixedRate((Runnable)ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)))).thenAnswer(invocation -> this.mockFuture);
        this.manager = this.createDefaultManagerWithMocks(() -> true);
        Assertions.assertTrue((boolean)this.manager.startHeartbeatForThread(Thread.currentThread()));
        Assertions.assertFalse((boolean)this.manager.startHeartbeatForThread(Thread.currentThread()));
        ((Logger)Mockito.verify((Object)this.mockLogger)).warn("Owner {}: Heartbeat is already running.", (Object)LOGGER_ID);
    }

    @Test
    void testStartHeartbeatSchedulerException() {
        ((ScheduledExecutorService)Mockito.doThrow((Throwable[])new Throwable[]{new RejectedExecutionException("Scheduler failure")}).when((Object)this.mockScheduler)).scheduleAtFixedRate((Runnable)ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)));
        this.manager = this.createDefaultManagerWithMocks(() -> true);
        Assertions.assertFalse((boolean)this.manager.startHeartbeatForThread(Thread.currentThread()));
        ((Logger)Mockito.verify((Object)this.mockLogger)).error((String)ArgumentMatchers.eq((Object)"Owner {}: Unable to schedule heartbeat task. {}"), ArgumentMatchers.eq((Object)LOGGER_ID), ArgumentMatchers.any(RejectedExecutionException.class));
    }

    @Test
    void testStopHeartbeatNeverStarted() {
        this.manager = this.createDefaultManagerWithMocks(() -> true);
        Assertions.assertFalse((boolean)this.manager.stopHeartbeat(true));
        ((Logger)Mockito.verify((Object)this.mockLogger)).warn("Owner {}: No active heartbeat task to stop.", (Object)LOGGER_ID);
    }

    @Test
    void testStopHeartbeatAlreadyRequested() {
        Mockito.when(this.mockScheduler.scheduleAtFixedRate((Runnable)ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)))).thenAnswer(invocation -> this.mockFuture);
        this.manager = this.createDefaultManagerWithMocks(() -> true);
        Assertions.assertTrue((boolean)this.manager.startHeartbeatForThread(Thread.currentThread()));
        Mockito.when((Object)this.mockFuture.cancel(true)).thenReturn((Object)true);
        Mockito.when((Object)this.mockFuture.isDone()).thenReturn((Object)false).thenReturn((Object)true);
        Assertions.assertTrue((boolean)this.manager.stopHeartbeat(true));
        Assertions.assertFalse((boolean)this.manager.stopHeartbeat(true));
        ((Logger)Mockito.verify((Object)this.mockLogger)).warn("Owner {}: No active heartbeat task to stop.", (Object)LOGGER_ID);
    }

    @Test
    void testHeartbeatUnableToAcquireSemaphore() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference t = new AtomicReference();
        Mockito.when(this.mockScheduler.scheduleAtFixedRate((Runnable)ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)))).thenAnswer(invocation -> {
            Runnable task = (Runnable)invocation.getArgument(0);
            t.set(new Thread(() -> {
                task.run();
                latch.countDown();
            }));
            return this.mockFuture;
        });
        Mockito.when((Object)this.mockFuture.cancel(true)).thenReturn((Object)true);
        Semaphore semaphore = (Semaphore)Mockito.mock(Semaphore.class);
        Mockito.when((Object)semaphore.tryAcquire()).thenReturn((Object)false);
        Mockito.when((Object)semaphore.tryAcquire(ArgumentMatchers.eq((long)LockProviderHeartbeatManager.DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS), (TimeUnit)((Object)ArgumentMatchers.eq((Object)((Object)TimeUnit.MILLISECONDS))))).thenReturn((Object)true);
        this.manager = new LockProviderHeartbeatManager(LOGGER_ID, this.mockScheduler, 100L, LockProviderHeartbeatManager.DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS, () -> true, semaphore, this.mockLogger);
        Assertions.assertTrue((boolean)this.manager.startHeartbeatForThread(Thread.currentThread()));
        ((Thread)t.get()).start();
        Assertions.assertTrue((boolean)latch.await(1000L, TimeUnit.MILLISECONDS));
        Assertions.assertTrue((boolean)this.manager.stopHeartbeat(true));
        ((Logger)Mockito.verify((Object)this.mockLogger)).error("Owner {}: Heartbeat semaphore should be acquirable at the start of every heartbeat!", (Object)LOGGER_ID);
        Assertions.assertFalse((boolean)this.manager.hasActiveHeartbeat());
    }

    @Test
    void testStopHeartbeatMockSuccessfulCancel() {
        Mockito.when(this.mockScheduler.scheduleAtFixedRate((Runnable)ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)))).thenAnswer(invocation -> this.mockFuture);
        Mockito.when((Object)this.mockFuture.cancel(true)).thenReturn((Object)true);
        this.manager = this.createDefaultManagerWithMocks(() -> true);
        this.manager.startHeartbeatForThread(Thread.currentThread());
        Mockito.when((Object)this.mockFuture.isDone()).thenReturn((Object)false).thenReturn((Object)true);
        Assertions.assertTrue((boolean)this.manager.stopHeartbeat(true));
    }

    @Test
    void testHeartbeatTaskHandlesInterrupt() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference t = new AtomicReference();
        Mockito.when(this.mockScheduler.scheduleAtFixedRate((Runnable)ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)))).thenAnswer(invocation -> {
            Runnable task = (Runnable)invocation.getArgument(0);
            t.set(new Thread(() -> {
                task.run();
                latch.countDown();
            }));
            return this.mockFuture;
        });
        Mockito.when((Object)this.mockFuture.cancel(true)).thenReturn((Object)true);
        this.manager = this.createDefaultManagerWithMocks(() -> false);
        Assertions.assertTrue((boolean)this.manager.startHeartbeatForThread(Thread.currentThread()));
        ((Thread)t.get()).start();
        ((Thread)t.get()).interrupt();
        Assertions.assertTrue((boolean)latch.await(500L, TimeUnit.MILLISECONDS), (String)"Heartbeat task did not run in time");
        Assertions.assertFalse((boolean)this.manager.stopHeartbeat(true));
        ((Logger)Mockito.verify((Object)this.mockLogger)).warn("Owner {}: No active heartbeat task to stop.", (Object)LOGGER_ID);
        ((Logger)Mockito.verify((Object)this.mockLogger)).debug("Owner {}: Heartbeat started with interval: {} ms", (Object)LOGGER_ID, (Object)100L);
        ((Logger)Mockito.verify((Object)this.mockLogger)).info("Owner {}: Requested termination of heartbeat task. Cancellation returned {}.", (Object)LOGGER_ID, (Object)true);
        Assertions.assertFalse((boolean)this.manager.hasActiveHeartbeat());
    }

    @Test
    void testHeartbeatTaskNullWriter() {
        this.manager = this.createDefaultManagerWithMocks(() -> true);
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.manager.startHeartbeatForThread(null));
    }

    @Test
    void testHeartbeatTaskImmediateDeadMonitoringThread() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference t = new AtomicReference();
        Mockito.when(this.mockScheduler.scheduleAtFixedRate((Runnable)ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)))).thenAnswer(invocation -> {
            Runnable task = (Runnable)invocation.getArgument(0);
            t.set(new Thread(() -> {
                task.run();
                latch.countDown();
            }));
            return this.mockFuture;
        });
        Mockito.when((Object)this.mockFuture.cancel(false)).thenReturn((Object)false);
        Thread deadThread = new Thread(() -> {});
        deadThread.start();
        deadThread.join();
        this.manager = this.createDefaultManagerWithMocks(() -> true);
        Assertions.assertTrue((boolean)this.manager.startHeartbeatForThread(deadThread));
        ((Thread)t.get()).start();
        Assertions.assertTrue((boolean)latch.await(500L, TimeUnit.MILLISECONDS), (String)"Heartbeat task did not run in time");
        ((Logger)Mockito.verify((Object)this.mockLogger)).warn("Owner {}: Monitored thread is no longer alive.", (Object)LOGGER_ID);
        ((Logger)Mockito.verify((Object)this.mockLogger)).info("Owner {}: Requested termination of heartbeat task. Cancellation returned {}.", (Object)LOGGER_ID, (Object)false);
        Assertions.assertFalse((boolean)this.manager.hasActiveHeartbeat());
    }

    @Test
    void testHeartbeatTaskRenewalException() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference t = new AtomicReference();
        Mockito.when(this.mockScheduler.scheduleAtFixedRate((Runnable)ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)))).thenAnswer(invocation -> {
            Runnable task = (Runnable)invocation.getArgument(0);
            t.set(new Thread(() -> {
                task.run();
                latch.countDown();
            }));
            return this.mockFuture;
        });
        this.manager = this.createDefaultManagerWithMocks(() -> {
            throw new RuntimeException("Renewal error");
        });
        Assertions.assertTrue((boolean)this.manager.startHeartbeatForThread(Thread.currentThread()));
        ((Thread)t.get()).start();
        Assertions.assertTrue((boolean)latch.await(500L, TimeUnit.MILLISECONDS), (String)"Heartbeat task did not run in time");
        ((Logger)Mockito.verify((Object)this.mockLogger)).error((String)ArgumentMatchers.eq((Object)"Owner {}: Heartbeat function threw exception {}"), ArgumentMatchers.eq((Object)LOGGER_ID), ArgumentMatchers.any(RuntimeException.class));
        Assertions.assertFalse((boolean)this.manager.hasActiveHeartbeat());
    }

    @Test
    void testHeartbeatStopWaitsForHeartbeatTaskToFinish() throws InterruptedException {
        CountDownLatch stopHeartbeatTaskLatch = new CountDownLatch(1);
        this.manager = this.createDefaultManagerWithRealExecutor(() -> {
            try {
                Assertions.assertTrue((boolean)stopHeartbeatTaskLatch.await(500L, TimeUnit.MILLISECONDS));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return true;
        });
        Assertions.assertTrue((boolean)this.manager.startHeartbeatForThread(Thread.currentThread()));
        CountDownLatch finishStopHeartbeatLatch = new CountDownLatch(1);
        Thread t = new Thread(() -> {
            Assertions.assertTrue((boolean)this.manager.stopHeartbeat(false));
            finishStopHeartbeatLatch.countDown();
        });
        t.start();
        stopHeartbeatTaskLatch.countDown();
        Assertions.assertTrue((boolean)finishStopHeartbeatLatch.await(500L, TimeUnit.MILLISECONDS), (String)"Stop heartbeat task did not finish.");
        Assertions.assertFalse((boolean)this.manager.hasActiveHeartbeat());
        ((Logger)Mockito.verify((Object)this.mockLogger)).debug("Owner {}: Heartbeat task successfully terminated.", (Object)LOGGER_ID);
    }

    @Test
    void testHeartbeatUnableToStopHeartbeatTask() throws InterruptedException {
        CountDownLatch stopHeartbeatTaskLatch = new CountDownLatch(1);
        CountDownLatch heartbeatStartedLatch = new CountDownLatch(1);
        this.manager = new LockProviderHeartbeatManager(LOGGER_ID, this.actualExecutorService, 100L, 5000L, () -> {
            try {
                heartbeatStartedLatch.countDown();
                Assertions.assertTrue((boolean)stopHeartbeatTaskLatch.await(10000L, TimeUnit.MILLISECONDS));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return true;
        }, new Semaphore(1), this.mockLogger);
        Assertions.assertTrue((boolean)this.manager.startHeartbeatForThread(Thread.currentThread()));
        CountDownLatch stopHeartbeatLatch = new CountDownLatch(1);
        Thread stopHeartbeatThread = new Thread(() -> {
            Assertions.assertFalse((boolean)this.manager.stopHeartbeat(false));
            stopHeartbeatLatch.countDown();
        });
        Assertions.assertTrue((boolean)heartbeatStartedLatch.await(500L, TimeUnit.MILLISECONDS), (String)"Heartbeat task did not start.");
        stopHeartbeatThread.start();
        Assertions.assertTrue((boolean)stopHeartbeatLatch.await(7000L, TimeUnit.MILLISECONDS), (String)"Stop heartbeat task did not finish.");
        Assertions.assertTrue((boolean)this.manager.hasActiveHeartbeat());
        ((Logger)Mockito.verify((Object)this.mockLogger)).error("Owner {}: Heartbeat is still in flight!", (Object)LOGGER_ID);
        stopHeartbeatTaskLatch.countDown();
    }

    @Test
    void testHeartbeatInterruptStopHeartbeatTask() throws InterruptedException {
        CountDownLatch stopHeartbeatTaskLatch = new CountDownLatch(1);
        CountDownLatch heartbeatStartedLatch = new CountDownLatch(1);
        this.manager = new LockProviderHeartbeatManager(LOGGER_ID, this.actualExecutorService, 100L, 5000L, () -> {
            try {
                heartbeatStartedLatch.countDown();
                Assertions.assertTrue((boolean)stopHeartbeatTaskLatch.await(10000L, TimeUnit.MILLISECONDS));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return true;
        }, new Semaphore(1), this.mockLogger);
        Assertions.assertTrue((boolean)this.manager.startHeartbeatForThread(Thread.currentThread()));
        CountDownLatch stopHeartbeatLatch = new CountDownLatch(1);
        Thread stopHeartbeatThread = new Thread(() -> {
            Assertions.assertFalse((boolean)this.manager.stopHeartbeat(false));
            stopHeartbeatLatch.countDown();
        });
        Assertions.assertTrue((boolean)heartbeatStartedLatch.await(500L, TimeUnit.MILLISECONDS), (String)"Heartbeat task did not start.");
        stopHeartbeatThread.start();
        stopHeartbeatThread.interrupt();
        Assertions.assertTrue((boolean)stopHeartbeatLatch.await(7000L, TimeUnit.MILLISECONDS), (String)"Stop heartbeat task did not finish.");
        Assertions.assertTrue((boolean)this.manager.hasActiveHeartbeat());
        ((Logger)Mockito.verify((Object)this.mockLogger)).warn("Owner {}: Interrupted while waiting for heartbeat termination.", (Object)LOGGER_ID);
        stopHeartbeatTaskLatch.countDown();
    }

    @Test
    void testHeartbeatTaskValidateStop() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(2);
        this.manager = this.createDefaultManagerWithRealExecutor(() -> {
            latch.countDown();
            return true;
        });
        Assertions.assertTrue((boolean)this.manager.startHeartbeatForThread(Thread.currentThread()));
        Assertions.assertTrue((boolean)latch.await(2000L, TimeUnit.MILLISECONDS), (String)"Heartbeat did not renew twice in time");
        Assertions.assertEquals((long)0L, (long)latch.getCount(), (String)"Heartbeat did not execute exactly twice");
        Assertions.assertTrue((boolean)this.manager.hasActiveHeartbeat());
        Assertions.assertTrue((boolean)this.manager.stopHeartbeat(false));
        Assertions.assertFalse((boolean)this.manager.hasActiveHeartbeat());
    }

    @Test
    void testDefaultManagerRapidStartStop1Ms() {
        this.manager = new LockProviderHeartbeatManager(LOGGER_ID, 1L, () -> true);
        for (int i = 0; i < 100; ++i) {
            Assertions.assertTrue((boolean)this.manager.startHeartbeatForThread(Thread.currentThread()));
            Assertions.assertTrue((boolean)this.manager.hasActiveHeartbeat());
            Assertions.assertTrue((boolean)this.manager.stopHeartbeat(true));
            Assertions.assertFalse((boolean)this.manager.hasActiveHeartbeat());
        }
    }

    @Test
    void testClose() throws Exception {
        this.manager = this.createDefaultManagerWithMocks(() -> true);
        this.manager.close();
        Assertions.assertFalse((boolean)this.manager.hasActiveHeartbeat());
    }

    @Test
    void testClose_StopsHeartbeatAndShutsDownScheduler() throws Exception {
        Mockito.when((Object)this.mockScheduler.awaitTermination(5L, TimeUnit.SECONDS)).thenReturn((Object)true);
        this.manager = this.createDefaultManagerWithMocks(() -> true);
        this.manager.close();
        ((ScheduledExecutorService)Mockito.verify((Object)this.mockScheduler)).shutdown();
        ((ScheduledExecutorService)Mockito.verify((Object)this.mockScheduler, (VerificationMode)Mockito.never())).shutdownNow();
    }

    @Test
    void testClose_ForceShutdownWhenTerminationTimesOut() throws Exception {
        Mockito.when((Object)this.mockScheduler.awaitTermination(5L, TimeUnit.SECONDS)).thenReturn((Object)false);
        this.manager = this.createDefaultManagerWithMocks(() -> true);
        this.manager.close();
        ((ScheduledExecutorService)Mockito.verify((Object)this.mockScheduler)).shutdown();
        ((ScheduledExecutorService)Mockito.verify((Object)this.mockScheduler)).shutdownNow();
    }

    @Test
    void testClose_HandlesInterruptedException() throws Exception {
        Mockito.when((Object)this.mockScheduler.awaitTermination(5L, TimeUnit.SECONDS)).thenThrow(new Throwable[]{new InterruptedException()});
        this.manager = this.createDefaultManagerWithMocks(() -> true);
        this.manager.close();
        ((ScheduledExecutorService)Mockito.verify((Object)this.mockScheduler)).shutdown();
        ((ScheduledExecutorService)Mockito.verify((Object)this.mockScheduler)).shutdownNow();
        Assertions.assertTrue((boolean)Thread.currentThread().isInterrupted(), (String)"Thread should be interrupted after exception handling");
    }

    private LockProviderHeartbeatManager createDefaultManagerWithMocks(Supplier<Boolean> heartbeatFunc) {
        return new LockProviderHeartbeatManager(LOGGER_ID, this.mockScheduler, 100L, LockProviderHeartbeatManager.DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS, heartbeatFunc, new Semaphore(1), this.mockLogger);
    }

    private LockProviderHeartbeatManager createDefaultManagerWithRealExecutor(Supplier<Boolean> heartbeatFunc) {
        return new LockProviderHeartbeatManager(LOGGER_ID, this.actualExecutorService, 100L, LockProviderHeartbeatManager.DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS, heartbeatFunc, new Semaphore(1), this.mockLogger);
    }
}

