/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.sink;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreExpire;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Preconditions;

public class TableCommitImpl
implements InnerTableCommit {
    private final FileStoreCommit commit;
    private final List<CommitCallback> commitCallbacks;
    @Nullable
    private final FileStoreExpire expire;
    @Nullable
    private final PartitionExpire partitionExpire;
    @Nullable
    private final TagAutoCreation tagAutoCreation;
    private final Lock lock;
    @Nullable
    private final Duration consumerExpireTime;
    private final ConsumerManager consumerManager;
    @Nullable
    private Map<String, String> overwritePartition = null;
    private boolean batchCommitted = false;

    public TableCommitImpl(FileStoreCommit commit, List<CommitCallback> commitCallbacks, @Nullable FileStoreExpire expire, @Nullable PartitionExpire partitionExpire, @Nullable TagAutoCreation tagAutoCreation, Lock lock, @Nullable Duration consumerExpireTime, ConsumerManager consumerManager) {
        commit.withLock(lock);
        if (expire != null) {
            expire.withLock(lock);
        }
        if (partitionExpire != null) {
            partitionExpire.withLock(lock);
        }
        this.commit = commit;
        this.commitCallbacks = commitCallbacks;
        this.expire = expire;
        this.partitionExpire = partitionExpire;
        this.tagAutoCreation = tagAutoCreation;
        this.lock = lock;
        this.consumerExpireTime = consumerExpireTime;
        this.consumerManager = consumerManager;
    }

    @Override
    public TableCommitImpl withOverwrite(@Nullable Map<String, String> overwritePartitions) {
        this.overwritePartition = overwritePartitions;
        return this;
    }

    @Override
    public TableCommitImpl ignoreEmptyCommit(boolean ignoreEmptyCommit) {
        this.commit.ignoreEmptyCommit(ignoreEmptyCommit);
        return this;
    }

    @Override
    public Set<Long> filterCommitted(Set<Long> commitIdentifiers) {
        return this.commit.filterCommitted(commitIdentifiers);
    }

    @Override
    public void commit(List<CommitMessage> commitMessages) {
        Preconditions.checkState((!this.batchCommitted ? 1 : 0) != 0, (Object)"BatchTableCommit only support one-time committing.");
        this.batchCommitted = true;
        this.commit(Long.MAX_VALUE, commitMessages);
    }

    @Override
    public void commit(long identifier, List<CommitMessage> commitMessages) {
        this.commit(this.createManifestCommittable(identifier, commitMessages));
    }

    @Override
    public int filterAndCommit(Map<Long, List<CommitMessage>> commitIdentifiersAndMessages) {
        return this.filterAndCommitMultiple(commitIdentifiersAndMessages.entrySet().stream().map(e -> this.createManifestCommittable((Long)e.getKey(), (List)e.getValue())).collect(Collectors.toList()));
    }

    private ManifestCommittable createManifestCommittable(long identifier, List<CommitMessage> commitMessages) {
        ManifestCommittable committable = new ManifestCommittable(identifier);
        for (CommitMessage commitMessage : commitMessages) {
            committable.addFileCommittable(commitMessage);
        }
        return committable;
    }

    public void commit(ManifestCommittable committable) {
        this.commitMultiple(Collections.singletonList(committable));
    }

    public void commitMultiple(List<ManifestCommittable> committables) {
        if (this.overwritePartition == null) {
            for (ManifestCommittable committable : committables) {
                this.commit.commit(committable, new HashMap<String, String>());
            }
            if (!committables.isEmpty()) {
                this.expire(committables.get(committables.size() - 1).identifier());
            }
        } else {
            if (committables.size() > 1) {
                throw new RuntimeException("Multiple committables appear in overwrite mode, this may be a bug, please report it: " + committables);
            }
            ManifestCommittable committable = committables.size() == 1 ? committables.get(0) : new ManifestCommittable(Long.MAX_VALUE);
            this.commit.overwrite(this.overwritePartition, committable, Collections.emptyMap());
            this.expire(committable.identifier());
        }
        this.commitCallbacks.forEach(c -> c.call(committables));
    }

    public int filterAndCommitMultiple(List<ManifestCommittable> committables) {
        Set<Long> retryIdentifiers = this.commit.filterCommitted(committables.stream().map(ManifestCommittable::identifier).collect(Collectors.toSet()));
        List succeededCommittables = committables.stream().filter(c -> !retryIdentifiers.contains(c.identifier())).collect(Collectors.toList());
        this.commitCallbacks.forEach(c -> c.call(succeededCommittables));
        List<ManifestCommittable> retryCommittables = committables.stream().filter(c -> retryIdentifiers.contains(c.identifier())).sorted(Comparator.comparingLong(ManifestCommittable::identifier)).collect(Collectors.toList());
        if (retryCommittables.size() > 0) {
            this.commitMultiple(retryCommittables);
        }
        return retryCommittables.size();
    }

    private void expire(long partitionExpireIdentifier) {
        if (this.consumerExpireTime != null) {
            this.consumerManager.expire(LocalDateTime.now().minus(this.consumerExpireTime));
        }
        if (this.expire != null) {
            this.expire.expire();
        }
        if (this.partitionExpire != null) {
            this.partitionExpire.expire(partitionExpireIdentifier);
        }
        if (this.tagAutoCreation != null) {
            this.tagAutoCreation.run();
        }
    }

    @Override
    public void close() throws Exception {
        for (CommitCallback commitCallback : this.commitCallbacks) {
            IOUtils.closeQuietly((AutoCloseable)commitCallback);
        }
        IOUtils.closeQuietly((AutoCloseable)this.lock);
    }

    @Override
    public void abort(List<CommitMessage> commitMessages) {
        this.commit.abort(commitMessages);
    }
}

