package org.apache.beam.runners.fnexecution.control;

import java.util.Iterator;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Timestamp;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.springframework.util.SystemPropertyUtils;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/BundleCheckpointHandlers.class */
public class BundleCheckpointHandlers {

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/BundleCheckpointHandlers$StateAndTimerBundleCheckpointHandler.class */
    public static class StateAndTimerBundleCheckpointHandler<T> implements BundleCheckpointHandler {
        private final TimerInternalsFactory<T> timerInternalsFactory;
        private final StateInternalsFactory<T> stateInternalsFactory;
        private final Coder<WindowedValue<T>> residualCoder;
        private final Coder windowCoder;
        private final IdGenerator idGenerator = IdGenerators.incrementingLongs();
        public static final String SDF_PREFIX = "sdf_checkpoint";

        public StateAndTimerBundleCheckpointHandler(TimerInternalsFactory<T> timerInternalsFactory, StateInternalsFactory<T> stateInternalsFactory, Coder<WindowedValue<T>> coder, Coder coder2) {
            this.residualCoder = coder;
            this.windowCoder = coder2;
            this.timerInternalsFactory = timerInternalsFactory;
            this.stateInternalsFactory = stateInternalsFactory;
        }

        public static boolean isSdfTimer(String str) {
            return str.startsWith(SDF_PREFIX);
        }

        private static String constructSdfCheckpointId(String str, int i) {
            return "sdf_checkpoint:" + str + SystemPropertyUtils.VALUE_SEPARATOR + i;
        }

        @Override // org.apache.beam.runners.fnexecution.control.BundleCheckpointHandler
        public void onCheckpoint(BeamFnApi.ProcessBundleResponse processBundleResponse) {
            String id = this.idGenerator.getId();
            for (int i = 0; i < processBundleResponse.getResidualRootsCount(); i++) {
                BeamFnApi.DelayedBundleApplication residualRoots = processBundleResponse.getResidualRoots(i);
                if (residualRoots.hasApplication()) {
                    String constructSdfCheckpointId = constructSdfCheckpointId(id, i);
                    try {
                        WindowedValue windowedValue = (WindowedValue) CoderUtils.decodeFromByteArray(this.residualCoder, residualRoots.getApplication().getElement().toByteArray());
                        TimerInternals timerInternalsForKey = this.timerInternalsFactory.timerInternalsForKey(windowedValue.getValue());
                        StateInternals stateInternalsForKey = this.stateInternalsFactory.stateInternalsForKey(windowedValue.getValue());
                        Instant now = Instant.now();
                        if (residualRoots.hasRequestedTimeDelay()) {
                            now = now.plus(Duration.millis(residualRoots.getRequestedTimeDelay().getSeconds() * 1000));
                        }
                        long millis = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
                        if (residualRoots.getApplication().getOutputWatermarksMap().isEmpty()) {
                            millis = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
                        } else {
                            Iterator<Timestamp> it = residualRoots.getApplication().getOutputWatermarksMap().values().iterator();
                            while (it.hasNext()) {
                                millis = Math.min(millis, it.next().getSeconds() * 1000);
                            }
                        }
                        for (BoundedWindow boundedWindow : windowedValue.getWindows()) {
                            StateNamespace window = StateNamespaces.window(this.windowCoder, boundedWindow);
                            timerInternalsForKey.setTimer(window, constructSdfCheckpointId, "", now, Instant.ofEpochMilli(millis), TimeDomain.PROCESSING_TIME);
                            ((ValueState) stateInternalsForKey.state(window, StateTags.value(constructSdfCheckpointId, this.residualCoder))).write(WindowedValue.of(windowedValue.getValue(), windowedValue.getTimestamp(), ImmutableList.of(boundedWindow), windowedValue.getPane()));
                        }
                    } catch (Exception e) {
                        throw new RuntimeException("Failed to set timer/state for the residual", e);
                    }
                }
            }
        }
    }
}
