package org.opensearch.jobscheduler.sweeper;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.ActionListener;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.Murmur3HashFunction;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.component.LifecycleListener;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.set.Sets;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.ShardId;
import org.opensearch.jobscheduler.JobSchedulerSettings;
import org.opensearch.jobscheduler.ScheduledJobProvider;
import org.opensearch.jobscheduler.scheduler.JobScheduler;
import org.opensearch.jobscheduler.spi.JobDocVersion;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.jobscheduler.utils.VisibleForTesting;
import org.opensearch.rest.RestStatus;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

/* loaded from: input_file:org/opensearch/jobscheduler/sweeper/JobSweeper.class */
public class JobSweeper extends LifecycleListener implements IndexingOperationListener, ClusterStateListener {
    private static final Logger log = LogManager.getLogger(JobSweeper.class);
    private Client client;
    private ClusterService clusterService;
    private ThreadPool threadPool;
    private Map<String, ScheduledJobProvider> indexToProviders;
    private NamedXContentRegistry xContentRegistry;
    private Scheduler.Cancellable scheduledFullSweep;
    private ExecutorService fullSweepExecutor;
    private ConcurrentHashMap<ShardId, ConcurrentHashMap<String, JobDocVersion>> sweptJobs;
    private JobScheduler scheduler;
    private LockService lockService;
    private volatile long lastFullSweepTimeNano = System.nanoTime();
    private volatile TimeValue sweepPeriod;
    private volatile Integer sweepPageMaxSize;
    private volatile TimeValue sweepSearchTimeout;
    private volatile TimeValue sweepSearchBackoffMillis;
    private volatile Integer sweepSearchBackoffRetryCount;
    private volatile BackoffPolicy sweepSearchBackoff;
    private volatile Double jitterLimit;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opensearch/jobscheduler/sweeper/JobSweeper$ShardNodes.class */
    public static class ShardNodes {
        private static final int VIRTUAL_NODE_COUNT = 100;
        String localNodeId;
        Collection<String> activeShardNodeIds;
        private TreeMap<Integer, String> circle = new TreeMap<>();

        ShardNodes(String str, Collection<String> collection) {
            this.localNodeId = str;
            this.activeShardNodeIds = collection;
            for (String str2 : collection) {
                for (int i = 0; i < VIRTUAL_NODE_COUNT; i++) {
                    this.circle.put(Integer.valueOf(Murmur3HashFunction.hash(str2 + i)), str2);
                }
            }
        }

        boolean isOwningNode(String str) {
            if (this.circle.isEmpty()) {
                return false;
            }
            int hash = Murmur3HashFunction.hash(str);
            return this.localNodeId.equals(this.circle.higherEntry(Integer.valueOf(hash)) == null ? this.circle.firstEntry().getValue() : this.circle.higherEntry(Integer.valueOf(hash)).getValue());
        }
    }

    public JobSweeper(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool, NamedXContentRegistry namedXContentRegistry, Map<String, ScheduledJobProvider> map, JobScheduler jobScheduler, LockService lockService) {
        this.client = client;
        this.clusterService = clusterService;
        this.threadPool = threadPool;
        this.xContentRegistry = namedXContentRegistry;
        this.indexToProviders = map;
        this.scheduler = jobScheduler;
        this.lockService = lockService;
        loadSettings(settings);
        addConfigListeners();
        this.fullSweepExecutor = Executors.newSingleThreadExecutor(OpenSearchExecutors.daemonThreadFactory("opendistro_job_sweeper"));
        this.sweptJobs = new ConcurrentHashMap<>();
    }

    private void loadSettings(Settings settings) {
        this.sweepPeriod = (TimeValue) JobSchedulerSettings.SWEEP_PERIOD.get(settings);
        this.sweepPageMaxSize = (Integer) JobSchedulerSettings.SWEEP_PAGE_SIZE.get(settings);
        this.sweepSearchTimeout = (TimeValue) JobSchedulerSettings.REQUEST_TIMEOUT.get(settings);
        this.sweepSearchBackoffMillis = (TimeValue) JobSchedulerSettings.SWEEP_BACKOFF_MILLIS.get(settings);
        this.sweepSearchBackoffRetryCount = (Integer) JobSchedulerSettings.SWEEP_BACKOFF_RETRY_COUNT.get(settings);
        this.jitterLimit = (Double) JobSchedulerSettings.JITTER_LIMIT.get(settings);
        this.sweepSearchBackoff = updateRetryPolicy();
    }

    private void addConfigListeners() {
        this.clusterService.getClusterSettings().addSettingsUpdateConsumer(JobSchedulerSettings.SWEEP_PERIOD, timeValue -> {
            this.sweepPeriod = timeValue;
            log.debug("Reinitializing background full sweep with period: {}", Long.valueOf(this.sweepPeriod.getMinutes()));
            initBackgroundSweep();
        });
        this.clusterService.getClusterSettings().addSettingsUpdateConsumer(JobSchedulerSettings.SWEEP_PAGE_SIZE, num -> {
            this.sweepPageMaxSize = num;
            log.debug("Setting background sweep page size: {}", this.sweepPageMaxSize);
        });
        this.clusterService.getClusterSettings().addSettingsUpdateConsumer(JobSchedulerSettings.REQUEST_TIMEOUT, timeValue2 -> {
            this.sweepSearchTimeout = timeValue2;
            log.debug("Setting background sweep search timeout: {}", Long.valueOf(this.sweepSearchTimeout.getMinutes()));
        });
        this.clusterService.getClusterSettings().addSettingsUpdateConsumer(JobSchedulerSettings.SWEEP_BACKOFF_MILLIS, timeValue3 -> {
            this.sweepSearchBackoffMillis = timeValue3;
            this.sweepSearchBackoff = updateRetryPolicy();
            log.debug("Setting background sweep search backoff: {}", Long.valueOf(this.sweepSearchBackoffMillis.getMillis()));
        });
        this.clusterService.getClusterSettings().addSettingsUpdateConsumer(JobSchedulerSettings.SWEEP_BACKOFF_RETRY_COUNT, num2 -> {
            this.sweepSearchBackoffRetryCount = num2;
            this.sweepSearchBackoff = updateRetryPolicy();
            log.debug("Setting background sweep search backoff retry count: {}", this.sweepSearchBackoffRetryCount);
        });
        this.clusterService.getClusterSettings().addSettingsUpdateConsumer(JobSchedulerSettings.JITTER_LIMIT, d -> {
            this.jitterLimit = d;
            log.debug("Setting background sweep jitter limit: {}", this.jitterLimit);
        });
    }

    private BackoffPolicy updateRetryPolicy() {
        return BackoffPolicy.exponentialBackoff(this.sweepSearchBackoffMillis, this.sweepSearchBackoffRetryCount.intValue());
    }

    public void afterStart() {
        initBackgroundSweep();
    }

    public void beforeStop() {
        if (this.scheduledFullSweep != null) {
            this.scheduledFullSweep.cancel();
        }
    }

    public void beforeClose() {
        this.fullSweepExecutor.shutdown();
    }

    public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult indexResult) {
        if (indexResult.getResultType().equals(Engine.Result.Type.FAILURE)) {
            log.info("Indexing failed for job {} on index {}", index.id(), shardId.getIndexName());
            return;
        }
        String id = this.clusterService.localNode().getId();
        IndexShardRoutingTable shardRoutingTable = this.clusterService.state().routingTable().shardRoutingTable(shardId);
        ArrayList arrayList = new ArrayList();
        Iterator it = shardRoutingTable.iterator();
        while (it.hasNext()) {
            ShardRouting shardRouting = (ShardRouting) it.next();
            if (shardRouting.active()) {
                arrayList.add(shardRouting.currentNodeId());
            }
        }
        if (new ShardNodes(id, arrayList).isOwningNode(index.id())) {
            sweep(shardId, index.id(), index.source(), new JobDocVersion(indexResult.getTerm(), indexResult.getSeqNo(), indexResult.getVersion()));
        }
    }

    public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResult deleteResult) {
        if (deleteResult.getResultType() == Engine.Result.Type.FAILURE) {
            log.debug("Deletion failed for scheduled job {}. Continuing with current version {}", delete.id(), (this.sweptJobs.containsKey(shardId) ? this.sweptJobs.get(shardId) : new ConcurrentHashMap<>()).get(delete.id()));
        } else if (this.scheduler.getScheduledJobIds(shardId.getIndexName()).contains(delete.id())) {
            log.info("Descheduling job {} on index {}", delete.id(), shardId.getIndexName());
            this.scheduler.deschedule(shardId.getIndexName(), delete.id());
            this.lockService.deleteLock(LockModel.generateLockId(shardId.getIndexName(), delete.id()), ActionListener.wrap(bool -> {
                log.debug("Deleted lock: {}", bool);
            }, exc -> {
                log.debug("Failed to delete lock", exc);
            }));
        }
    }

    @VisibleForTesting
    void sweep(ShardId shardId, String str, BytesReference bytesReference, JobDocVersion jobDocVersion) {
        ConcurrentHashMap<String, JobDocVersion> concurrentHashMap;
        if (this.sweptJobs.containsKey(shardId)) {
            concurrentHashMap = this.sweptJobs.get(shardId);
        } else {
            concurrentHashMap = new ConcurrentHashMap<>();
            this.sweptJobs.put(shardId, concurrentHashMap);
        }
        concurrentHashMap.compute(str, (str2, jobDocVersion2) -> {
            if (jobDocVersion.compareTo(jobDocVersion2) <= 0) {
                log.debug("Skipping job {}, new version {} <= current version {}", str, jobDocVersion, jobDocVersion2);
                return jobDocVersion2;
            }
            if (this.scheduler.getScheduledJobIds(shardId.getIndexName()).contains(str)) {
                this.scheduler.deschedule(shardId.getIndexName(), str);
            }
            if (bytesReference == null) {
                return null;
            }
            try {
                ScheduledJobProvider scheduledJobProvider = this.indexToProviders.get(shardId.getIndexName());
                ScheduledJobParameter parse = scheduledJobProvider.getJobParser().parse(XContentHelper.createParser(this.xContentRegistry, LoggingDeprecationHandler.INSTANCE, bytesReference, XContentType.JSON), str, jobDocVersion);
                if (parse == null) {
                    return null;
                }
                ScheduledJobRunner jobRunner = this.indexToProviders.get(shardId.getIndexName()).getJobRunner();
                if (parse.isEnabled()) {
                    this.scheduler.schedule(shardId.getIndexName(), str, parse, jobRunner, jobDocVersion, this.jitterLimit);
                }
                return jobDocVersion;
            } catch (Exception e) {
                log.warn("Unable to parse job {}, error message: {}", str, e.getMessage());
                return jobDocVersion2;
            }
        });
    }

    public void clusterChanged(ClusterChangedEvent clusterChangedEvent) {
        for (String str : this.indexToProviders.keySet()) {
            if (clusterChangedEvent.indexRoutingTableChanged(str)) {
                this.fullSweepExecutor.submit(() -> {
                    sweepIndex(str);
                });
            }
        }
    }

    @VisibleForTesting
    void initBackgroundSweep() {
        if (this.scheduledFullSweep != null) {
            this.scheduledFullSweep.cancel();
        }
        this.scheduledFullSweep = this.threadPool.scheduleWithFixedDelay(() -> {
            log.info("Running full sweep");
            if (this.sweepPeriod.millis() - getFullSweepElapsedTime().millis() < 20) {
                this.fullSweepExecutor.submit(this::sweepAllJobIndices);
            }
        }, this.sweepPeriod, "same");
    }

    private TimeValue getFullSweepElapsedTime() {
        return TimeValue.timeValueNanos(System.nanoTime() - this.lastFullSweepTimeNano);
    }

    private Map<ShardId, List<ShardRouting>> getLocalShards(ClusterState clusterState, String str, String str2) {
        return (Map) ((Map) clusterState.routingTable().allShards(str2).stream().filter((v0) -> {
            return v0.active();
        }).collect(Collectors.groupingBy((v0) -> {
            return v0.shardId();
        }, Collectors.mapping(shardRouting -> {
            return shardRouting;
        }, Collectors.toList())))).entrySet().stream().filter(entry -> {
            return ((List) entry.getValue()).stream().filter(shardRouting2 -> {
                return shardRouting2.currentNodeId().equals(str);
            }).count() > 0;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private void sweepAllJobIndices() {
        Iterator<String> it = this.indexToProviders.keySet().iterator();
        while (it.hasNext()) {
            sweepIndex(it.next());
        }
        this.lastFullSweepTimeNano = System.nanoTime();
    }

    private void sweepIndex(String str) {
        ClusterState state = this.clusterService.state();
        if (!state.routingTable().hasIndex(str)) {
            Iterator it = this.sweptJobs.keySet().iterator();
            while (it.hasNext()) {
                ShardId shardId = (ShardId) it.next();
                if (shardId.getIndexName().equals(str) && this.sweptJobs.containsKey(shardId)) {
                    log.info("Descheduling jobs, shard {} index {} as the index is removed.", Integer.valueOf(shardId.getId()), str);
                    this.scheduler.bulkDeschedule(shardId.getIndexName(), this.sweptJobs.get(shardId).keySet());
                }
            }
            return;
        }
        String localNodeId = state.getNodes().getLocalNodeId();
        Map<ShardId, List<ShardRouting>> localShards = getLocalShards(state, localNodeId, str);
        Iterator<Map.Entry<ShardId, ConcurrentHashMap<String, JobDocVersion>>> it2 = this.sweptJobs.entrySet().iterator();
        while (it2.hasNext()) {
            Map.Entry<ShardId, ConcurrentHashMap<String, JobDocVersion>> next = it2.next();
            if (next.getKey().getIndexName().equals(str) && !localShards.containsKey(next.getKey())) {
                log.info("Descheduling jobs of shard {} index {} as the shard is removed from this node.", Integer.valueOf(next.getKey().getId()), str);
                this.scheduler.bulkDeschedule(str, next.getValue().keySet());
                it2.remove();
            }
        }
        for (Map.Entry<ShardId, List<ShardRouting>> entry : localShards.entrySet()) {
            try {
                sweepShard(entry.getKey(), new ShardNodes(localNodeId, (List) entry.getValue().stream().map((v0) -> {
                    return v0.currentNodeId();
                }).collect(Collectors.toList())), null);
            } catch (Exception e) {
                log.info("Error while sweeping shard {}, error message: {}", entry.getKey(), e.getMessage());
            }
        }
    }

    private void sweepShard(ShardId shardId, ShardNodes shardNodes, String str) {
        ConcurrentHashMap<String, JobDocVersion> concurrentHashMap = this.sweptJobs.containsKey(shardId) ? this.sweptJobs.get(shardId) : new ConcurrentHashMap<>();
        Iterator it = concurrentHashMap.keySet().iterator();
        while (it.hasNext()) {
            String str2 = (String) it.next();
            if (!shardNodes.isOwningNode(str2)) {
                this.scheduler.deschedule(shardId.getIndexName(), str2);
                concurrentHashMap.remove(str2);
            }
        }
        String str3 = str == null ? "" : str;
        while (true) {
            String str4 = str3;
            if (str4 == null) {
                return;
            }
            SearchResponse searchResponse = (SearchResponse) ((ActionFuture) retry(searchRequest -> {
                return this.client.search(searchRequest);
            }, new SearchRequest().indices(new String[]{shardId.getIndexName()}).preference("_shards:" + shardId.id() + "|_only_local").source(new SearchSourceBuilder().version(true).seqNoAndPrimaryTerm(true).sort(new FieldSortBuilder("_id").unmappedType("keyword").missing("_last")).searchAfter(new String[]{str4}).size(this.sweepPageMaxSize.intValue()).query(QueryBuilders.matchAllQuery())), this.sweepSearchBackoff)).actionGet(this.sweepSearchTimeout);
            if (searchResponse.status() != RestStatus.OK) {
                log.error("Error sweeping shard {}, failed querying jobs on this shard", shardId);
                return;
            }
            Iterator it2 = searchResponse.getHits().iterator();
            while (it2.hasNext()) {
                SearchHit searchHit = (SearchHit) it2.next();
                String id = searchHit.getId();
                if (shardNodes.isOwningNode(id)) {
                    sweep(shardId, id, searchHit.getSourceRef(), new JobDocVersion(searchHit.getPrimaryTerm(), searchHit.getSeqNo(), searchHit.getVersion()));
                }
            }
            str3 = (searchResponse.getHits() == null || searchResponse.getHits().getHits().length < 1) ? null : searchResponse.getHits().getHits()[searchResponse.getHits().getHits().length - 1].getId();
        }
    }

    private <T, R> R retry(Function<T, R> function, T t, BackoffPolicy backoffPolicy) {
        HashSet newHashSet = Sets.newHashSet(new RestStatus[]{RestStatus.BAD_GATEWAY, RestStatus.GATEWAY_TIMEOUT, RestStatus.SERVICE_UNAVAILABLE});
        Iterator it = backoffPolicy.iterator();
        while (true) {
            try {
                return function.apply(t);
            } catch (OpenSearchException e) {
                if (!it.hasNext() || !newHashSet.contains(e.status())) {
                    throw e;
                }
                try {
                    Thread.sleep(((TimeValue) it.next()).millis());
                } catch (InterruptedException e2) {
                    throw e;
                }
            }
        }
        throw e;
    }
}
