package org.apache.druid.segment.realtime.appenderator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.SegmentWithState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.class */
public abstract class BaseAppenderatorDriver implements Closeable {
    private static final Logger log = new Logger(BaseAppenderatorDriver.class);
    private final SegmentAllocator segmentAllocator;
    private final UsedSegmentChecker usedSegmentChecker;
    private final DataSegmentKiller dataSegmentKiller;
    protected final Appenderator appenderator;
    protected final Map<String, SegmentsForSequence> segments = new TreeMap();
    protected final ListeningExecutorService executor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver$SegmentsForSequence.class */
    public static class SegmentsForSequence {
        private final NavigableMap<Long, SegmentsOfInterval> intervalToSegmentStates;
        private String lastSegmentId;

        SegmentsForSequence() {
            this.intervalToSegmentStates = new TreeMap();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public SegmentsForSequence(NavigableMap<Long, SegmentsOfInterval> navigableMap, String str) {
            this.intervalToSegmentStates = navigableMap;
            this.lastSegmentId = str;
        }

        void add(SegmentIdWithShardSpec segmentIdWithShardSpec) {
            ((SegmentsOfInterval) this.intervalToSegmentStates.computeIfAbsent(Long.valueOf(segmentIdWithShardSpec.getInterval().getStartMillis()), l -> {
                return new SegmentsOfInterval(segmentIdWithShardSpec.getInterval());
            })).setAppendingSegment(SegmentWithState.newSegment(segmentIdWithShardSpec));
            this.lastSegmentId = segmentIdWithShardSpec.toString();
        }

        Map.Entry<Long, SegmentsOfInterval> floor(long j) {
            return this.intervalToSegmentStates.floorEntry(Long.valueOf(j));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public SegmentsOfInterval get(long j) {
            return (SegmentsOfInterval) this.intervalToSegmentStates.get(Long.valueOf(j));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Stream<SegmentWithState> allSegmentStateStream() {
            return this.intervalToSegmentStates.values().stream().flatMap(segmentsOfInterval -> {
                return segmentsOfInterval.getAllSegments().stream();
            });
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Stream<SegmentsOfInterval> getAllSegmentsOfInterval() {
            return this.intervalToSegmentStates.values().stream();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver$SegmentsOfInterval.class */
    public static class SegmentsOfInterval {
        private final Interval interval;
        private final List<SegmentWithState> appendFinishedSegments = new ArrayList();

        @Nullable
        private SegmentWithState appendingSegment;

        SegmentsOfInterval(Interval interval) {
            this.interval = interval;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public SegmentsOfInterval(Interval interval, @Nullable SegmentWithState segmentWithState, List<SegmentWithState> list) {
            this.interval = interval;
            this.appendingSegment = segmentWithState;
            this.appendFinishedSegments.addAll(list);
            if (segmentWithState != null) {
                Preconditions.checkArgument(segmentWithState.getState() == SegmentWithState.SegmentState.APPENDING, "appendingSegment[%s] is not in the APPENDING state", segmentWithState.getSegmentIdentifier());
            }
            if (list.stream().anyMatch(segmentWithState2 -> {
                return segmentWithState2.getState() == SegmentWithState.SegmentState.APPENDING;
            })) {
                throw new ISE("Some appendFinishedSegments[%s] is in the APPENDING state", list);
            }
        }

        void setAppendingSegment(SegmentWithState segmentWithState) {
            Preconditions.checkArgument(segmentWithState.getState() == SegmentWithState.SegmentState.APPENDING, "segment[%s] is not in the APPENDING state", segmentWithState.getSegmentIdentifier());
            Preconditions.checkState(this.appendingSegment == null, "Current appendingSegment[%s] is not null. Its state must be changed before setting a new appendingSegment[%s]", this.appendingSegment, segmentWithState);
            this.appendingSegment = segmentWithState;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void finishAppendingToCurrentActiveSegment(Consumer<SegmentWithState> consumer) {
            Preconditions.checkNotNull(this.appendingSegment, "appendingSegment");
            consumer.accept(this.appendingSegment);
            this.appendFinishedSegments.add(this.appendingSegment);
            this.appendingSegment = null;
        }

        Interval getInterval() {
            return this.interval;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public SegmentWithState getAppendingSegment() {
            return this.appendingSegment;
        }

        List<SegmentWithState> getAllSegments() {
            ArrayList arrayList = new ArrayList(this.appendFinishedSegments.size() + 1);
            if (this.appendingSegment != null) {
                arrayList.add(this.appendingSegment);
            }
            arrayList.addAll(this.appendFinishedSegments);
            return arrayList;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver$WrappedCommitter.class */
    public static class WrappedCommitter implements Committer {
        private final Committer delegate;
        private final AppenderatorDriverMetadata metadata;

        WrappedCommitter(Committer committer, AppenderatorDriverMetadata appenderatorDriverMetadata) {
            this.delegate = committer;
            this.metadata = appenderatorDriverMetadata;
        }

        @Override // org.apache.druid.data.input.Committer
        public Object getMetadata() {
            return this.metadata;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.delegate.run();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BaseAppenderatorDriver(Appenderator appenderator, SegmentAllocator segmentAllocator, UsedSegmentChecker usedSegmentChecker, DataSegmentKiller dataSegmentKiller) {
        this.appenderator = (Appenderator) Preconditions.checkNotNull(appenderator, "appenderator");
        this.segmentAllocator = (SegmentAllocator) Preconditions.checkNotNull(segmentAllocator, "segmentAllocator");
        this.usedSegmentChecker = (UsedSegmentChecker) Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker");
        this.dataSegmentKiller = (DataSegmentKiller) Preconditions.checkNotNull(dataSegmentKiller, "dataSegmentKiller");
        this.executor = MoreExecutors.listeningDecorator(Execs.singleThreaded("[" + appenderator.getId() + "]-publish"));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public Map<String, SegmentsForSequence> getSegments() {
        return this.segments;
    }

    @Nullable
    public abstract Object startJob(AppenderatorDriverSegmentLockHelper appenderatorDriverSegmentLockHelper);

    private SegmentIdWithShardSpec getAppendableSegment(DateTime dateTime, String str) {
        synchronized (this.segments) {
            SegmentsForSequence segmentsForSequence = this.segments.get(str);
            if (segmentsForSequence == null) {
                return null;
            }
            Map.Entry<Long, SegmentsOfInterval> floor = segmentsForSequence.floor(dateTime.getMillis());
            if (floor == null) {
                return null;
            }
            SegmentsOfInterval value = floor.getValue();
            if (value.interval.contains(dateTime)) {
                return value.appendingSegment == null ? null : value.appendingSegment.getSegmentIdentifier();
            }
            return null;
        }
    }

    private SegmentIdWithShardSpec getSegment(InputRow inputRow, String str, boolean z) throws IOException {
        synchronized (this.segments) {
            DateTime timestamp = inputRow.getTimestamp();
            SegmentIdWithShardSpec appendableSegment = getAppendableSegment(timestamp, str);
            if (appendableSegment != null) {
                return appendableSegment;
            }
            SegmentsForSequence segmentsForSequence = this.segments.get(str);
            SegmentIdWithShardSpec allocate = this.segmentAllocator.allocate(inputRow, str, segmentsForSequence == null ? null : segmentsForSequence.lastSegmentId, z);
            if (allocate != null) {
                for (SegmentIdWithShardSpec segmentIdWithShardSpec : this.appenderator.getSegments()) {
                    if (segmentIdWithShardSpec.equals(allocate)) {
                        throw new ISE("Allocated segment[%s] which conflicts with existing segment[%s].", allocate, segmentIdWithShardSpec);
                    }
                }
                log.info("New segment[%s] for sequenceName[%s].", allocate, str);
                addSegment(str, allocate);
            } else {
                log.warn("Cannot allocate segment for timestamp[%s], sequenceName[%s].", timestamp, str);
            }
            return allocate;
        }
    }

    private void addSegment(String str, SegmentIdWithShardSpec segmentIdWithShardSpec) {
        synchronized (this.segments) {
            this.segments.computeIfAbsent(str, str2 -> {
                return new SegmentsForSequence();
            }).add(segmentIdWithShardSpec);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AppenderatorDriverAddResult append(InputRow inputRow, String str, @Nullable Supplier<Committer> supplier, boolean z, boolean z2) throws IOException {
        Preconditions.checkNotNull(inputRow, "row");
        Preconditions.checkNotNull(str, "sequenceName");
        SegmentIdWithShardSpec segment = getSegment(inputRow, str, z);
        if (segment == null) {
            return AppenderatorDriverAddResult.fail();
        }
        try {
            Appenderator.AppenderatorAddResult add = this.appenderator.add(segment, inputRow, supplier == null ? null : wrapCommitterSupplier(supplier), z2);
            return AppenderatorDriverAddResult.ok(segment, add.getNumRowsInSegment(), this.appenderator.getTotalRowCount(), add.isPersistRequired());
        } catch (SegmentNotWritableException e) {
            throw new ISE(e, "Segment[%s] not writable when it should have been.", segment);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<SegmentIdWithShardSpec> getSegmentIdsWithShardSpecs(Collection<String> collection) {
        List<SegmentIdWithShardSpec> list;
        synchronized (this.segments) {
            Stream<String> stream = collection.stream();
            Map<String, SegmentsForSequence> map = this.segments;
            map.getClass();
            list = (List) stream.map((v1) -> {
                return r1.get(v1);
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).flatMap(segmentsForSequence -> {
                return segmentsForSequence.intervalToSegmentStates.values().stream();
            }).flatMap(segmentsOfInterval -> {
                return segmentsOfInterval.getAllSegments().stream();
            }).map((v0) -> {
                return v0.getSegmentIdentifier();
            }).collect(Collectors.toList());
        }
        return list;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<SegmentIdWithShardSpec> getAppendingSegments(Collection<String> collection) {
        Set<SegmentIdWithShardSpec> set;
        synchronized (this.segments) {
            Stream<String> stream = collection.stream();
            Map<String, SegmentsForSequence> map = this.segments;
            map.getClass();
            set = (Set) stream.map((v1) -> {
                return r1.get(v1);
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).flatMap(segmentsForSequence -> {
                return segmentsForSequence.intervalToSegmentStates.values().stream();
            }).map(segmentsOfInterval -> {
                return segmentsOfInterval.appendingSegment;
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).map((v0) -> {
                return v0.getSegmentIdentifier();
            }).collect(Collectors.toSet());
        }
        return set;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ListenableFuture<SegmentsAndCommitMetadata> pushInBackground(@Nullable WrappedCommitter wrappedCommitter, Collection<SegmentIdWithShardSpec> collection, boolean z) {
        log.info("Pushing [%s] segments in background", Integer.valueOf(collection.size()));
        log.infoSegmentIds(collection.stream().map((v0) -> {
            return v0.asSegmentId();
        }), "Pushing segments");
        return Futures.transform(this.appenderator.push(collection, wrappedCommitter, z), segmentsAndCommitMetadata -> {
            Set set = (Set) segmentsAndCommitMetadata.getSegments().stream().map(SegmentIdWithShardSpec::fromDataSegment).collect(Collectors.toSet());
            if (set.equals(Sets.newHashSet(collection))) {
                return segmentsAndCommitMetadata;
            }
            log.warn("Removing [%s] segments from deep storage because sanity check failed", Integer.valueOf(segmentsAndCommitMetadata.getSegments().size()));
            log.warnSegments(segmentsAndCommitMetadata.getSegments(), "Removing segments due to failed sanity check");
            List<DataSegment> segments = segmentsAndCommitMetadata.getSegments();
            DataSegmentKiller dataSegmentKiller = this.dataSegmentKiller;
            dataSegmentKiller.getClass();
            segments.forEach(dataSegmentKiller::killQuietly);
            throw new ISE("Pushed different segments than requested. Pushed[%s], requested[%s].", set, collection);
        }, this.executor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ListenableFuture<SegmentsAndCommitMetadata> dropInBackground(SegmentsAndCommitMetadata segmentsAndCommitMetadata) {
        log.debugSegments(segmentsAndCommitMetadata.getSegments(), "Dropping segments");
        return Futures.transform(Futures.allAsList((Iterable) segmentsAndCommitMetadata.getSegments().stream().map(dataSegment -> {
            return this.appenderator.drop(SegmentIdWithShardSpec.fromDataSegment(dataSegment));
        }).collect(Collectors.toList())), obj -> {
            Object commitMetadata = segmentsAndCommitMetadata.getCommitMetadata();
            return new SegmentsAndCommitMetadata(segmentsAndCommitMetadata.getSegments(), commitMetadata == null ? null : ((AppenderatorDriverMetadata) commitMetadata).getCallerMetadata());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ListenableFuture<SegmentsAndCommitMetadata> publishInBackground(@Nullable Set<DataSegment> set, SegmentsAndCommitMetadata segmentsAndCommitMetadata, TransactionalSegmentPublisher transactionalSegmentPublisher, Function<Set<DataSegment>, Set<DataSegment>> function) {
        if (segmentsAndCommitMetadata.getSegments().isEmpty()) {
            if (!transactionalSegmentPublisher.supportsEmptyPublish()) {
                log.info("Nothing to publish, skipping publish step.", new Object[0]);
                SettableFuture create = SettableFuture.create();
                create.set(segmentsAndCommitMetadata);
                return create;
            }
            if (this.appenderator.getTotalRowCount() != 0) {
                throw new ISE("Attempting to publish with empty segment set, but total row count was not 0: [%s].", Integer.valueOf(this.appenderator.getTotalRowCount()));
            }
        }
        Object commitMetadata = segmentsAndCommitMetadata.getCommitMetadata();
        Object callerMetadata = commitMetadata == null ? null : ((AppenderatorDriverMetadata) commitMetadata).getCallerMetadata();
        return this.executor.submit(() -> {
            try {
                ImmutableSet copyOf = ImmutableSet.copyOf((Collection) segmentsAndCommitMetadata.getSegments());
                SegmentPublishResult publishSegments = transactionalSegmentPublisher.publishSegments(set, copyOf, function, callerMetadata);
                if (publishSegments.isSuccess()) {
                    log.info("Published [%s] segments with commit metadata [%s]", Integer.valueOf(segmentsAndCommitMetadata.getSegments().size()), callerMetadata);
                    log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Published segments");
                } else {
                    Set<DataSegment> findUsedSegments = this.usedSegmentChecker.findUsedSegments((Set) segmentsAndCommitMetadata.getSegments().stream().map(SegmentIdWithShardSpec::fromDataSegment).collect(Collectors.toSet()));
                    if (!findUsedSegments.equals(copyOf)) {
                        List<DataSegment> segments = segmentsAndCommitMetadata.getSegments();
                        DataSegmentKiller dataSegmentKiller = this.dataSegmentKiller;
                        dataSegmentKiller.getClass();
                        segments.forEach(dataSegmentKiller::killQuietly);
                        if (publishSegments.getErrorMsg() != null) {
                            log.errorSegments(copyOf, "Failed to publish segments");
                            throw new ISE("Failed to publish segments because of [%s]", publishSegments.getErrorMsg());
                        }
                        log.errorSegments(copyOf, "Failed to publish segments");
                        throw new ISE("Failed to publish segments", new Object[0]);
                    }
                    log.info("Could not publish [%s] segments, but checked and found them already published; continuing.", Integer.valueOf(copyOf.size()));
                    log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Could not publish segments");
                    if (Sets.intersection((Set) findUsedSegments.stream().map((v0) -> {
                        return v0.getLoadSpec();
                    }).collect(Collectors.toSet()), (Set) copyOf.stream().map((v0) -> {
                        return v0.getLoadSpec();
                    }).collect(Collectors.toSet())).isEmpty()) {
                        List<DataSegment> segments2 = segmentsAndCommitMetadata.getSegments();
                        DataSegmentKiller dataSegmentKiller2 = this.dataSegmentKiller;
                        dataSegmentKiller2.getClass();
                        segments2.forEach(dataSegmentKiller2::killQuietly);
                    }
                }
                return segmentsAndCommitMetadata;
            } catch (Exception e) {
                log.noStackTrace().warn(e, "Failed publish", new Object[0]);
                log.warnSegments(segmentsAndCommitMetadata.getSegments(), "Failed publish, not removing segments");
                Throwables.propagateIfPossible(e);
                throw new RuntimeException(e);
            }
        });
    }

    @VisibleForTesting
    public void clear() throws InterruptedException {
        synchronized (this.segments) {
            this.segments.clear();
        }
        this.appenderator.clear();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.executor.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WrappedCommitter wrapCommitter(Committer committer) {
        ImmutableMap copyOf;
        synchronized (this.segments) {
            copyOf = ImmutableMap.copyOf((Map) this.segments);
        }
        return new WrappedCommitter(committer, new AppenderatorDriverMetadata(ImmutableMap.copyOf(Maps.transformValues(copyOf, segmentsForSequence -> {
            return ImmutableList.copyOf((Collection) segmentsForSequence.intervalToSegmentStates.values().stream().flatMap(segmentsOfInterval -> {
                return segmentsOfInterval.getAllSegments().stream();
            }).collect(Collectors.toList()));
        })), CollectionUtils.mapValues(copyOf, segmentsForSequence2 -> {
            return segmentsForSequence2.lastSegmentId;
        }), committer.getMetadata()));
    }

    private Supplier<Committer> wrapCommitterSupplier(Supplier<Committer> supplier) {
        return () -> {
            return wrapCommitter((Committer) supplier.get2());
        };
    }
}
