/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import io.hops.hudi.com.codahale.metrics.Timer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
import org.apache.hudi.client.AbstractHoodieClient;
import org.apache.hudi.client.AsyncCleanerService;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRestoreException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.savepoint.SavepointHelpers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I, K, O>
extends AbstractHoodieClient {
    protected static final String LOOKUP_STR = "lookup";
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LogManager.getLogger(AbstractHoodieWriteClient.class);
    protected final transient HoodieMetrics metrics = new HoodieMetrics(this.config, this.config.getTableName());
    private final transient HoodieIndex<T, I, K, O> index;
    protected transient Timer.Context writeTimer = null;
    protected transient Timer.Context compactionTimer;
    protected transient Timer.Context clusteringTimer;
    private transient WriteOperationType operationType;
    private transient HoodieWriteCommitCallback commitCallback;
    private transient AsyncCleanerService asyncCleanerService;
    protected final boolean rollbackPending;

    public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
        this(context, clientConfig, false);
    }

    public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) {
        this(context, writeConfig, rollbackPending, Option.empty());
    }

    public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending, Option<EmbeddedTimelineService> timelineService) {
        super(context, writeConfig, timelineService);
        this.rollbackPending = rollbackPending;
        this.index = this.createIndex(writeConfig);
    }

    protected abstract HoodieIndex<T, I, K, O> createIndex(HoodieWriteConfig var1);

    public void setOperationType(WriteOperationType operationType) {
        this.operationType = operationType;
    }

    public WriteOperationType getOperationType() {
        return this.operationType;
    }

    public boolean commit(String instantTime, O writeStatuses) {
        return this.commit(instantTime, writeStatuses, Option.empty());
    }

    public boolean commit(String instantTime, O writeStatuses, Option<Map<String, String>> extraMetadata) {
        HoodieTableMetaClient metaClient = this.createMetaClient(false);
        String actionType = metaClient.getCommitActionType();
        return this.commit(instantTime, writeStatuses, extraMetadata, actionType, Collections.emptyMap());
    }

    public abstract boolean commit(String var1, O var2, Option<Map<String, String>> var3, String var4, Map<String, List<String>> var5);

    public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata, String commitActionType) {
        return this.commitStats(instantTime, stats, extraMetadata, commitActionType, Collections.emptyMap());
    }

    public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplaceFileIds) {
        LOG.info((Object)("Committing " + instantTime + " action " + commitActionType));
        HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
        HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
        HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds, extraMetadata, this.operationType, this.config.getSchema(), commitActionType);
        this.finalizeWrite(table, instantTime, stats);
        try {
            activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
            this.postCommit(table, metadata, instantTime, extraMetadata);
            this.emitCommitMetrics(instantTime, metadata, commitActionType);
            LOG.info((Object)("Committed " + instantTime));
        }
        catch (IOException e) {
            throw new HoodieCommitException("Failed to complete commit " + this.config.getBasePath() + " at time " + instantTime, e);
        }
        if (this.config.writeCommitCallbackOn()) {
            if (null == this.commitCallback) {
                this.commitCallback = HoodieCommitCallbackFactory.create(this.config);
            }
            this.commitCallback.call(new HoodieWriteCommitCallbackMessage(instantTime, this.config.getTableName(), this.config.getBasePath()));
        }
        return true;
    }

    protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig var1, Configuration var2);

    void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
        try {
            if (this.writeTimer != null) {
                long durationInMs = this.metrics.getDurationInMs(this.writeTimer.stop());
                this.metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(instantTime).getTime(), durationInMs, metadata, actionType);
                this.writeTimer = null;
            }
        }
        catch (ParseException e) {
            throw new HoodieCommitException("Failed to complete commit " + this.config.getBasePath() + " at time " + instantTime + "Instant time is not of valid format", e);
        }
    }

    protected void syncTableMetadata() {
    }

    public abstract I filterExists(I var1);

    public void bootstrap(Option<Map<String, String>> extraMetadata) {
        if (this.rollbackPending) {
            this.rollBackInflightBootstrap();
        }
        HoodieTable<T, I, K, O> table = this.getTableAndInitCtx(WriteOperationType.UPSERT, "00000000000001");
        table.bootstrap(this.context, extraMetadata);
    }

    protected void rollBackInflightBootstrap() {
        LOG.info((Object)"Rolling back pending bootstrap if present");
        HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
        HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
        Option<String> instant = Option.fromJavaOptional(inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
        if (instant.isPresent() && HoodieTimeline.compareTimestamps(instant.get(), HoodieTimeline.LESSER_THAN_OR_EQUALS, "00000000000002")) {
            LOG.info((Object)"Found pending bootstrap instants. Rolling them back");
            table.rollbackBootstrap(this.context, HoodieActiveTimeline.createNewInstantTime());
            LOG.info((Object)"Finished rolling back pending bootstrap");
        }
    }

    public abstract O upsert(I var1, String var2);

    public abstract O upsertPreppedRecords(I var1, String var2);

    public abstract O insert(I var1, String var2);

    public abstract O insertPreppedRecords(I var1, String var2);

    public abstract O bulkInsert(I var1, String var2);

    public abstract O bulkInsert(I var1, String var2, Option<BulkInsertPartitioner<I>> var3);

    public abstract O bulkInsertPreppedRecords(I var1, String var2, Option<BulkInsertPartitioner<I>> var3);

    public abstract O delete(K var1, String var2);

    protected void preWrite(String instantTime, WriteOperationType writeOperationType) {
        this.setOperationType(writeOperationType);
        this.syncTableMetadata();
        this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
    }

    protected abstract O postWrite(HoodieWriteMetadata<O> var1, String var2, HoodieTable<T, I, K, O> var3);

    protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
        try {
            new MarkerFiles(table, instantTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
            if (this.config.isInlineCompaction()) {
                this.runAnyPendingCompactions(table);
                metadata.addMetadata("hoodie.compact.inline", "true");
                this.inlineCompact(extraMetadata);
            } else {
                metadata.addMetadata("hoodie.compact.inline", "false");
            }
            if (this.config.isInlineClustering()) {
                this.runAnyPendingClustering(table);
                metadata.addMetadata("hoodie.clustering.inline", "true");
                this.inlineCluster(extraMetadata);
            } else {
                metadata.addMetadata("hoodie.clustering.inline", "false");
            }
            HoodieTimelineArchiveLog<T, I, K, O> archiveLog = new HoodieTimelineArchiveLog<T, I, K, O>(this.config, table);
            archiveLog.archiveIfRequired(this.context);
            this.autoCleanOnCommit();
            this.syncTableMetadata();
        }
        catch (IOException ioe) {
            throw new HoodieIOException(ioe.getMessage(), ioe);
        }
    }

    protected void runAnyPendingCompactions(HoodieTable<T, I, K, O> table) {
        table.getActiveTimeline().getCommitsAndCompactionTimeline().filterPendingCompactionTimeline().getInstants().forEach(instant -> {
            LOG.info((Object)("Running previously failed inflight compaction at instant " + instant));
            this.compact(instant.getTimestamp(), true);
        });
    }

    protected void runAnyPendingClustering(HoodieTable<T, I, K, O> table) {
        table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> {
            Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
            if (instantPlan.isPresent()) {
                LOG.info((Object)("Running pending clustering at instant " + instantPlan.get().getLeft()));
                this.cluster(instant.getTimestamp(), true);
            }
        });
    }

    protected void autoCleanOnCommit() {
        if (this.config.isAutoClean()) {
            if (this.config.isAsyncClean()) {
                LOG.info((Object)"Cleaner has been spawned already. Waiting for it to finish");
                AsyncCleanerService.waitForCompletion(this.asyncCleanerService);
                LOG.info((Object)"Cleaner has finished");
            } else {
                LOG.info((Object)"Auto cleaning is enabled. Running cleaner now");
                this.clean();
            }
        }
    }

    public void savepoint(String user, String comment) {
        HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
        if (table.getCompletedCommitsTimeline().empty()) {
            throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
        }
        String latestCommit = table.getCompletedCommitsTimeline().lastInstant().get().getTimestamp();
        LOG.info((Object)("Savepointing latest commit " + latestCommit));
        this.savepoint(latestCommit, user, comment);
    }

    public void savepoint(String instantTime, String user, String comment) {
        HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
        table.savepoint(this.context, instantTime, user, comment);
    }

    public void deleteSavepoint(String savepointTime) {
        HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
        SavepointHelpers.deleteSavepoint(table, savepointTime);
    }

    public void restoreToSavepoint(String savepointTime) {
        HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
        SavepointHelpers.validateSavepointPresence(table, savepointTime);
        this.restoreToInstant(savepointTime);
        SavepointHelpers.validateSavepointRestore(table, savepointTime);
    }

    public boolean rollback(String commitInstantTime) throws HoodieRollbackException {
        LOG.info((Object)("Begin rollback of instant " + commitInstantTime));
        String rollbackInstantTime = HoodieActiveTimeline.createNewInstantTime();
        Timer.Context timerContext = this.metrics.getRollbackCtx();
        try {
            HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
            Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants().filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime)).findFirst());
            if (commitInstantOpt.isPresent()) {
                HoodieRollbackMetadata rollbackMetadata = table.rollback(this.context, rollbackInstantTime, commitInstantOpt.get(), true);
                if (timerContext != null) {
                    long durationInMs = this.metrics.getDurationInMs(timerContext.stop());
                    this.metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted().intValue());
                }
                return true;
            }
            LOG.warn((Object)("Cannot find instant " + commitInstantTime + " in the timeline, for rollback"));
            return false;
        }
        catch (Exception e) {
            throw new HoodieRollbackException("Failed to rollback " + this.config.getBasePath() + " commits " + commitInstantTime, e);
        }
    }

    public HoodieRestoreMetadata restoreToInstant(String instantTime) throws HoodieRestoreException {
        LOG.info((Object)("Begin restore to instant " + instantTime));
        String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
        Timer.Context timerContext = this.metrics.getRollbackCtx();
        try {
            HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
            HoodieRestoreMetadata restoreMetadata = table.restore(this.context, restoreInstantTime, instantTime);
            if (timerContext != null) {
                long durationInMs = this.metrics.getDurationInMs(timerContext.stop());
                long totalFilesDeleted = restoreMetadata.getHoodieRestoreMetadata().values().stream().flatMap(Collection::stream).mapToLong(HoodieRollbackMetadata::getTotalFilesDeleted).sum();
                this.metrics.updateRollbackMetrics(durationInMs, totalFilesDeleted);
            }
            return restoreMetadata;
        }
        catch (Exception e) {
            throw new HoodieRestoreException("Failed to restore to " + instantTime, e);
        }
    }

    public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
        LOG.info((Object)"Cleaner started");
        Timer.Context timerContext = this.metrics.getCleanCtx();
        HoodieCleanMetadata metadata = this.createTable(this.config, this.hadoopConf).clean(this.context, cleanInstantTime);
        if (timerContext != null && metadata != null) {
            long durationMs = this.metrics.getDurationInMs(timerContext.stop());
            this.metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
            LOG.info((Object)("Cleaned " + metadata.getTotalFilesDeleted() + " files Earliest Retained Instant :" + metadata.getEarliestCommitToRetain() + " cleanerElapsedMs" + durationMs));
        }
        return metadata;
    }

    public HoodieCleanMetadata clean() {
        return this.clean(HoodieActiveTimeline.createNewInstantTime());
    }

    public String startCommit() {
        if (this.rollbackPending) {
            this.rollbackPendingCommits();
        }
        String instantTime = HoodieActiveTimeline.createNewInstantTime();
        HoodieTableMetaClient metaClient = this.createMetaClient(true);
        this.startCommit(instantTime, metaClient.getCommitActionType(), metaClient);
        return instantTime;
    }

    public void startCommitWithTime(String instantTime) {
        HoodieTableMetaClient metaClient = this.createMetaClient(true);
        this.startCommitWithTime(instantTime, metaClient.getCommitActionType(), metaClient);
    }

    public void startCommitWithTime(String instantTime, String actionType) {
        HoodieTableMetaClient metaClient = this.createMetaClient(true);
        this.startCommitWithTime(instantTime, actionType, metaClient);
    }

    private void startCommitWithTime(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
        if (this.rollbackPending) {
            this.rollbackPendingCommits();
        }
        this.startCommit(instantTime, actionType, metaClient);
    }

    private void startCommit(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
        LOG.info((Object)("Generate a new instant time: " + instantTime + " action: " + actionType));
        metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending -> ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), HoodieTimeline.LESSER_THAN, instantTime), "Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :" + latestPending + ",  Ingesting at " + instantTime));
        metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, actionType, instantTime));
    }

    public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        String instantTime = HoodieActiveTimeline.createNewInstantTime();
        return this.scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
    }

    public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        LOG.info((Object)("Scheduling compaction at instant time :" + instantTime));
        Option<HoodieCompactionPlan> plan = this.createTable(this.config, this.hadoopConf).scheduleCompaction(this.context, instantTime, extraMetadata);
        return plan.isPresent();
    }

    public O compact(String compactionInstantTime) {
        return this.compact(compactionInstantTime, this.config.shouldAutoCommit());
    }

    public abstract void commitCompaction(String var1, O var2, Option<Map<String, String>> var3) throws IOException;

    protected abstract void completeCompaction(HoodieCommitMetadata var1, O var2, HoodieTable<T, I, K, O> var3, String var4);

    public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable<T, I, K, O> table) {
        table.rollback(this.context, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false);
        table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
    }

    private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieTable<T, I, K, O> table) {
        HoodieTimeline inflightTimelineWithReplaceCommit = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
        HoodieTimeline inflightTimelineExcludeClusteringCommit = inflightTimelineWithReplaceCommit.filter(instant -> {
            if (instant.getAction().equals("replacecommit")) {
                Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
                return !instantPlan.isPresent();
            }
            return true;
        });
        return inflightTimelineExcludeClusteringCommit;
    }

    private void rollbackPendingCommits() {
        HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
        HoodieTimeline inflightTimeline = this.getInflightTimelineExcludeCompactionAndClustering(table);
        List commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
        for (String commit : commits) {
            if (HoodieTimeline.compareTimestamps(commit, HoodieTimeline.LESSER_THAN_OR_EQUALS, "00000000000002")) {
                this.rollBackInflightBootstrap();
                break;
            }
            this.rollback(commit);
        }
    }

    protected abstract O compact(String var1, boolean var2);

    protected Option<String> inlineCompact(Option<Map<String, String>> extraMetadata) {
        Option<String> compactionInstantTimeOpt = this.scheduleCompaction(extraMetadata);
        compactionInstantTimeOpt.ifPresent(compactionInstantTime -> this.compact((String)compactionInstantTime, true));
        return compactionInstantTimeOpt;
    }

    public Option<String> scheduleClustering(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        String instantTime = HoodieActiveTimeline.createNewInstantTime();
        return this.scheduleClusteringAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
    }

    public boolean scheduleClusteringAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        LOG.info((Object)("Scheduling clustering at instant time :" + instantTime));
        Option<HoodieClusteringPlan> plan = this.createTable(this.config, this.hadoopConf).scheduleClustering(this.context, instantTime, extraMetadata);
        return plan.isPresent();
    }

    public abstract HoodieWriteMetadata<O> cluster(String var1, boolean var2);

    protected Option<String> inlineCluster(Option<Map<String, String>> extraMetadata) {
        Option<String> clusteringInstantOpt = this.scheduleClustering(extraMetadata);
        clusteringInstantOpt.ifPresent(clusteringInstant -> this.cluster((String)clusteringInstant, true));
        return clusteringInstantOpt;
    }

    protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable<T, I, K, O> table) {
        table.rollback(this.context, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false);
        table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant);
    }

    protected void finalizeWrite(HoodieTable<T, I, K, O> table, String instantTime, List<HoodieWriteStat> stats) {
        try {
            Timer.Context finalizeCtx = this.metrics.getFinalizeCtx();
            table.finalizeWrite(this.context, instantTime, stats);
            if (finalizeCtx != null) {
                Option<Long> durationInMs = Option.of(this.metrics.getDurationInMs(finalizeCtx.stop()));
                durationInMs.ifPresent(duration -> {
                    LOG.info((Object)("Finalize write elapsed time (milliseconds): " + duration));
                    this.metrics.updateFinalizeWriteMetrics((long)duration, stats.size());
                });
            }
        }
        catch (HoodieIOException ioe) {
            throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
        }
    }

    public HoodieMetrics getMetrics() {
        return this.metrics;
    }

    public HoodieIndex<T, I, K, O> getIndex() {
        return this.index;
    }

    protected abstract HoodieTable<T, I, K, O> getTableAndInitCtx(WriteOperationType var1, String var2);

    protected void setWriteSchemaForDeletes(HoodieTableMetaClient metaClient) {
        try {
            HoodieCommitMetadata commitMetadata;
            HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
            Option<HoodieInstant> lastInstant = activeTimeline.filterCompletedInstants().filter(s -> s.getAction().equals(metaClient.getCommitActionType()) || s.getAction().equals("replacecommit")).lastInstant();
            if (lastInstant.isPresent()) {
                commitMetadata = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class);
                if (!commitMetadata.getExtraMetadata().containsKey("schema")) {
                    throw new HoodieIOException("Latest commit does not have any schema in commit metadata");
                }
            } else {
                throw new HoodieIOException("Deletes issued without any prior commits");
            }
            this.config.setSchema(commitMetadata.getExtraMetadata().get("schema"));
        }
        catch (IOException e) {
            throw new HoodieIOException("IOException thrown while reading last commit metadata", e);
        }
    }

    @Override
    public void close() {
        AsyncCleanerService.forceShutdown(this.asyncCleanerService);
        this.asyncCleanerService = null;
        super.close();
        this.index.close();
    }
}

