package org.apache.hudi.common.util.queue;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.FutureUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.class */
public abstract class BaseHoodieQueueBasedExecutor<I, O, E> implements HoodieExecutor<E> {
    private static final long TERMINATE_WAITING_TIME_SECS = 60;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final ExecutorService producerExecutorService;
    private final ExecutorService consumerExecutorService;
    protected final HoodieMessageQueue<I, O> queue;
    private final List<HoodieProducer<I>> producers;
    protected final Option<HoodieConsumer<O, E>> consumer;
    private CompletableFuture<Void> consumingFuture;
    private CompletableFuture<Void> producingFuture;

    public BaseHoodieQueueBasedExecutor(List<HoodieProducer<I>> list, Option<HoodieConsumer<O, E>> option, HoodieMessageQueue<I, O> hoodieMessageQueue, Runnable runnable) {
        this.queue = hoodieMessageQueue;
        this.producers = list;
        this.consumer = option;
        this.producerExecutorService = Executors.newFixedThreadPool(Math.max(1, list.size()), new CustomizedThreadFactory("executor-queue-producer", runnable));
        this.consumerExecutorService = Executors.newSingleThreadExecutor(new CustomizedThreadFactory("executor-queue-consumer", runnable));
    }

    protected void doProduce(HoodieMessageQueue<I, O> hoodieMessageQueue, HoodieProducer<I> hoodieProducer) {
        this.logger.info("Starting producer, populating records into the queue");
        try {
            hoodieProducer.produce(hoodieMessageQueue);
            this.logger.info("Finished producing records into the queue");
        } catch (Exception e) {
            this.logger.error("Failed to produce records", e);
            hoodieMessageQueue.markAsFailed(e);
            throw new HoodieException("Failed to produce records", e);
        }
    }

    protected abstract void doConsume(HoodieMessageQueue<I, O> hoodieMessageQueue, HoodieConsumer<O, E> hoodieConsumer);

    protected void setUp() {
    }

    public final CompletableFuture<Void> startProducingAsync() {
        return FutureUtils.allOf((List) this.producers.stream().map(hoodieProducer -> {
            return CompletableFuture.supplyAsync(() -> {
                doProduce(this.queue, hoodieProducer);
                return (Void) null;
            }, this.producerExecutorService);
        }).collect(Collectors.toList())).thenApply(list -> {
            return (Void) null;
        }).whenComplete((r4, th) -> {
            this.producers.forEach((v0) -> {
                v0.close();
            });
            this.queue.seal();
        });
    }

    private CompletableFuture<Void> startConsumingAsync() {
        return (CompletableFuture) this.consumer.map(hoodieConsumer -> {
            return CompletableFuture.supplyAsync(() -> {
                doConsume(this.queue, hoodieConsumer);
                return (Void) null;
            }, this.consumerExecutorService);
        }).orElse(CompletableFuture.completedFuture(null));
    }

    @Override // org.apache.hudi.common.util.queue.HoodieExecutor
    public final boolean awaitTermination() {
        boolean interrupted = Thread.interrupted();
        boolean z = false;
        boolean z2 = false;
        try {
            z = this.producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
            z2 = this.consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
        }
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
        return z && z2;
    }

    @Override // org.apache.hudi.common.util.queue.HoodieExecutor
    public final void shutdownNow() {
        if (this.producingFuture != null) {
            this.producingFuture.cancel(true);
        }
        if (this.consumingFuture != null) {
            this.consumingFuture.cancel(true);
        }
        this.producers.forEach((v0) -> {
            v0.close();
        });
        this.consumer.ifPresent((v0) -> {
            v0.finish();
        });
        this.producerExecutorService.shutdownNow();
        this.consumerExecutorService.shutdownNow();
    }

    public boolean isRunning() {
        return !this.queue.isEmpty();
    }

    @Override // org.apache.hudi.common.util.queue.HoodieExecutor
    public E execute() {
        try {
            ValidationUtils.checkState(this.consumer.isPresent());
            setUp();
            this.consumingFuture = startConsumingAsync();
            this.producingFuture = startProducingAsync();
            return (E) FutureUtils.allOf(Arrays.asList(this.producingFuture, this.consumingFuture)).whenComplete((list, th) -> {
                this.queue.close();
            }).thenApply(list2 -> {
                return this.consumer.get().finish();
            }).get();
        } catch (Exception e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            if (this.queue.getThrowable() != null) {
                throw new HoodieException(this.queue.getThrowable());
            }
            throw new HoodieException(e);
        }
    }
}
