/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.consumer;

import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.FileUtils;

public class ConsumerManager
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final String CONSUMER_PREFIX = "consumer-";
    private final FileIO fileIO;
    private final Path tablePath;

    public ConsumerManager(FileIO fileIO, Path tablePath) {
        this.fileIO = fileIO;
        this.tablePath = tablePath;
    }

    public Optional<Consumer> consumer(String consumerId) {
        return Consumer.fromPath(this.fileIO, this.consumerPath(consumerId));
    }

    public void resetConsumer(String consumerId, Consumer consumer) {
        try (PositionOutputStream out = this.fileIO.newOutputStream(this.consumerPath(consumerId), true);){
            OutputStreamWriter writer = new OutputStreamWriter((OutputStream)out, StandardCharsets.UTF_8);
            writer.write(consumer.toJson());
            writer.flush();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public OptionalLong minNextSnapshot() {
        try {
            return FileUtils.listOriginalVersionedFiles(this.fileIO, this.consumerDirectory(), CONSUMER_PREFIX).map(this::consumer).filter(Optional::isPresent).map(Optional::get).mapToLong(Consumer::nextSnapshot).reduce(Math::min);
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public void expire(LocalDateTime expireDateTime) {
        try {
            FileUtils.listVersionedFileStatus(this.fileIO, this.consumerDirectory(), CONSUMER_PREFIX).forEach(status -> {
                LocalDateTime modificationTime = DateTimeUtils.toLocalDateTime((long)status.getModificationTime());
                if (expireDateTime.isAfter(modificationTime)) {
                    this.fileIO.deleteQuietly(status.getPath());
                }
            });
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public Map<String, Long> consumers() throws IOException {
        HashMap<String, Long> consumers = new HashMap<String, Long>();
        FileUtils.listOriginalVersionedFiles(this.fileIO, this.consumerDirectory(), CONSUMER_PREFIX).forEach(id -> {
            Optional<Consumer> consumer = this.consumer((String)id);
            consumer.ifPresent(value -> consumers.put((String)id, value.nextSnapshot()));
        });
        return consumers;
    }

    public List<String> listAllIds() {
        try {
            return FileUtils.listOriginalVersionedFiles(this.fileIO, this.consumerDirectory(), CONSUMER_PREFIX).collect(Collectors.toList());
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private Path consumerDirectory() {
        return new Path(this.tablePath + "/consumer");
    }

    private Path consumerPath(String consumerId) {
        return new Path(this.tablePath + "/consumer/" + CONSUMER_PREFIX + consumerId);
    }
}

