/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.NanoClock;
import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.cloud.dataflow.sdk.PipelineResult;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException;
import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms;
import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowMetricUpdateExtractor;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.util.FluentBackoff;
import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
import com.google.cloud.dataflow.sdk.util.TimeUtil;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataflowPipelineJob
implements PipelineResult {
    private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineJob.class);
    private String jobId;
    private String projectId;
    private Dataflow dataflowClient;
    @Nullable
    private PipelineResult.State terminalState = null;
    @Nullable
    private DataflowPipelineJob replacedByJob = null;
    private DataflowAggregatorTransforms aggregatorTransforms;
    private List<MetricUpdate> terminalMetricUpdates;
    static final Duration MESSAGES_POLLING_INTERVAL = Duration.standardSeconds((long)2L);
    static final Duration STATUS_POLLING_INTERVAL = Duration.standardSeconds((long)2L);
    static final double DEFAULT_BACKOFF_EXPONENT = 1.5;
    static final int MESSAGES_POLLING_RETRIES = 11;
    static final int STATUS_POLLING_RETRIES = 4;
    private static final FluentBackoff MESSAGES_BACKOFF_FACTORY = FluentBackoff.DEFAULT.withInitialBackoff(MESSAGES_POLLING_INTERVAL).withMaxRetries(11).withExponent(1.5);
    protected static final FluentBackoff STATUS_BACKOFF_FACTORY = FluentBackoff.DEFAULT.withInitialBackoff(STATUS_POLLING_INTERVAL).withMaxRetries(4).withExponent(1.5);

    public DataflowPipelineJob(String projectId, String jobId, Dataflow dataflowClient, DataflowAggregatorTransforms aggregatorTransforms) {
        this.projectId = projectId;
        this.jobId = jobId;
        this.dataflowClient = dataflowClient;
        this.aggregatorTransforms = aggregatorTransforms;
    }

    public String getJobId() {
        return this.jobId;
    }

    public String getProjectId() {
        return this.projectId;
    }

    public DataflowPipelineJob getReplacedByJob() {
        if (this.terminalState == null) {
            throw new IllegalStateException("getReplacedByJob() called before job terminated");
        }
        if (this.replacedByJob == null) {
            throw new IllegalStateException("getReplacedByJob() called for job that was not replaced");
        }
        return this.replacedByJob;
    }

    public Dataflow getDataflowClient() {
        return this.dataflowClient;
    }

    @Nullable
    public PipelineResult.State waitToFinish(long timeToWait, TimeUnit timeUnit, MonitoringUtil.JobMessagesHandler messageHandler) throws IOException, InterruptedException {
        Duration duration = Duration.millis((long)timeUnit.toMillis(timeToWait));
        return this.waitToFinish(duration, messageHandler, Sleeper.DEFAULT, NanoClock.SYSTEM);
    }

    @Nullable
    @VisibleForTesting
    PipelineResult.State waitToFinish(Duration duration, MonitoringUtil.JobMessagesHandler messageHandler, Sleeper sleeper, NanoClock nanoClock) throws IOException, InterruptedException {
        PipelineResult.State state;
        MonitoringUtil monitor = new MonitoringUtil(this.projectId, this.dataflowClient);
        long lastTimestamp = 0L;
        BackOff backoff = !duration.isLongerThan((ReadableDuration)Duration.ZERO) ? MESSAGES_BACKOFF_FACTORY.backoff() : MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration).backoff();
        long startNanos = nanoClock.nanoTime();
        do {
            boolean hasError;
            boolean bl = hasError = (state = this.getStateWithRetries(STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff(), sleeper)) == PipelineResult.State.UNKNOWN;
            if (messageHandler != null && !hasError) {
                try {
                    ArrayList<JobMessage> allMessages = monitor.getJobMessages(this.jobId, lastTimestamp);
                    if (!allMessages.isEmpty()) {
                        lastTimestamp = TimeUtil.fromCloudTime(((JobMessage)allMessages.get(allMessages.size() - 1)).getTime()).getMillis();
                        messageHandler.process(allMessages);
                    }
                }
                catch (GoogleJsonResponseException | SocketTimeoutException e) {
                    hasError = true;
                    LOG.warn("There were problems getting current job messages: {}.", (Object)e.getMessage());
                    LOG.debug("Exception information:", e);
                }
            }
            if (hasError) continue;
            if (state.isTerminal()) {
                return state;
            }
            backoff.reset();
            if (!duration.isLongerThan((ReadableDuration)Duration.ZERO)) continue;
            long nanosConsumed = nanoClock.nanoTime() - startNanos;
            Duration consumed = Duration.millis((long)((nanosConsumed + 999999L) / 1000000L));
            Duration remaining = duration.minus((ReadableDuration)consumed);
            backoff = remaining.isLongerThan((ReadableDuration)Duration.ZERO) ? MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(remaining).backoff() : BackOff.STOP_BACKOFF;
        } while (BackOffUtils.next((Sleeper)sleeper, (BackOff)backoff));
        LOG.warn("No terminal state was returned.  State value {}", (Object)state);
        return null;
    }

    public void cancel() throws IOException {
        Job content = new Job();
        content.setProjectId(this.projectId);
        content.setId(this.jobId);
        content.setRequestedState("JOB_STATE_CANCELLED");
        this.dataflowClient.projects().jobs().update(this.projectId, this.jobId, content).execute();
    }

    @Override
    public PipelineResult.State getState() {
        if (this.terminalState != null) {
            return this.terminalState;
        }
        return this.getStateWithRetries(STATUS_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
    }

    @VisibleForTesting
    PipelineResult.State getStateWithRetries(BackOff attempts, Sleeper sleeper) {
        if (this.terminalState != null) {
            return this.terminalState;
        }
        try {
            Job job = this.getJobWithRetries(attempts, sleeper);
            return MonitoringUtil.toState(job.getCurrentState());
        }
        catch (IOException exn) {
            return PipelineResult.State.UNKNOWN;
        }
    }

    private Job getJobWithRetries(BackOff backoff, Sleeper sleeper) throws IOException {
        while (true) {
            try {
                Job job = (Job)this.dataflowClient.projects().jobs().get(this.projectId, this.jobId).execute();
                PipelineResult.State currentState = MonitoringUtil.toState(job.getCurrentState());
                if (currentState.isTerminal()) {
                    this.terminalState = currentState;
                    this.replacedByJob = new DataflowPipelineJob(this.getProjectId(), job.getReplacedByJobId(), this.dataflowClient, this.aggregatorTransforms);
                }
                return job;
            }
            catch (IOException exn) {
                LOG.warn("There were problems getting current job status: {}.", (Object)exn.getMessage());
                LOG.debug("Exception information:", (Throwable)exn);
                if (this.nextBackOff(sleeper, backoff)) continue;
                throw exn;
            }
            break;
        }
    }

    private boolean nextBackOff(Sleeper sleeper, BackOff backoff) {
        try {
            return BackOffUtils.next((Sleeper)sleeper, (BackOff)backoff);
        }
        catch (IOException | InterruptedException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new RuntimeException(e);
        }
    }

    public <OutputT> AggregatorValues<OutputT> getAggregatorValues(Aggregator<?, OutputT> aggregator) throws AggregatorRetrievalException {
        try {
            return new MapAggregatorValues<OutputT>(this.fromMetricUpdates(aggregator));
        }
        catch (IOException e) {
            throw new AggregatorRetrievalException("IOException when retrieving Aggregator values for Aggregator " + aggregator, e);
        }
    }

    private <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator) throws IOException {
        if (this.aggregatorTransforms.contains(aggregator)) {
            List metricUpdates;
            if (this.terminalMetricUpdates != null) {
                metricUpdates = this.terminalMetricUpdates;
            } else {
                boolean terminal = this.getState().isTerminal();
                JobMetrics jobMetrics = (JobMetrics)this.dataflowClient.projects().jobs().getMetrics(this.projectId, this.jobId).execute();
                metricUpdates = jobMetrics.getMetrics();
                if (terminal && jobMetrics.getMetrics() != null) {
                    this.terminalMetricUpdates = metricUpdates;
                }
            }
            return DataflowMetricUpdateExtractor.fromMetricUpdates(aggregator, this.aggregatorTransforms, metricUpdates);
        }
        throw new IllegalArgumentException("Aggregator " + aggregator + " is not used in this pipeline");
    }
}

