/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.checkpointing;

import java.io.IOException;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrier;
import org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned;
import org.apache.flink.streaming.runtime.io.checkpointing.BarrierHandlerState;
import org.apache.flink.streaming.runtime.io.checkpointing.ChannelState;
import org.apache.flink.util.Preconditions;

final class AlternatingCollectingBarriersUnaligned
implements BarrierHandlerState {
    private final boolean alternating;
    private final ChannelState channelState;

    AlternatingCollectingBarriersUnaligned(boolean alternating, ChannelState channelState) {
        this.alternating = alternating;
        this.channelState = channelState;
    }

    @Override
    public BarrierHandlerState alignmentTimeout(BarrierHandlerState.Controller controller, CheckpointBarrier checkpointBarrier) {
        return this;
    }

    @Override
    public BarrierHandlerState announcementReceived(BarrierHandlerState.Controller controller, InputChannelInfo channelInfo, int sequenceNumber) throws IOException {
        this.channelState.getInputs()[channelInfo.getGateIdx()].convertToPriorityEvent(channelInfo.getInputChannelIdx(), sequenceNumber);
        return this;
    }

    @Override
    public BarrierHandlerState barrierReceived(BarrierHandlerState.Controller controller, InputChannelInfo channelInfo, CheckpointBarrier checkpointBarrier, boolean markChannelBlocked) throws CheckpointException, IOException {
        if (markChannelBlocked && !checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint()) {
            this.channelState.blockChannel(channelInfo);
        }
        if (controller.allBarriersReceived()) {
            return this.finishCheckpoint(checkpointBarrier.getId());
        }
        return this;
    }

    @Override
    public BarrierHandlerState abort(long cancelledId) throws IOException {
        return this.finishCheckpoint(cancelledId);
    }

    @Override
    public BarrierHandlerState endOfPartitionReceived(BarrierHandlerState.Controller controller, InputChannelInfo channelInfo) throws IOException, CheckpointException {
        this.channelState.channelFinished(channelInfo);
        if (controller.allBarriersReceived()) {
            Preconditions.checkState(controller.getPendingCheckpointBarrier() != null, "At least one barrier received in unaligned collecting barrier state.");
            return this.finishCheckpoint(controller.getPendingCheckpointBarrier().getId());
        }
        return this;
    }

    private BarrierHandlerState finishCheckpoint(long cancelledId) throws IOException {
        for (CheckpointableInput input : this.channelState.getInputs()) {
            input.checkpointStopped(cancelledId);
        }
        this.channelState.unblockAllChannels();
        if (this.alternating) {
            return new AlternatingWaitingForFirstBarrier(this.channelState.emptyState());
        }
        return new AlternatingWaitingForFirstBarrierUnaligned(false, this.channelState.emptyState());
    }
}

