/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.lookup;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import javax.annotation.Nullable;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.lookup.RocksDBListState;
import org.apache.paimon.lookup.RocksDBOptions;
import org.apache.paimon.lookup.RocksDBSetState;
import org.apache.paimon.lookup.RocksDBState;
import org.apache.paimon.lookup.RocksDBValueState;
import org.apache.paimon.utils.KeyValueIterator;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.EnvOptions;
import org.rocksdb.IngestExternalFileOptions;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileWriter;
import org.rocksdb.TtlDB;

public class RocksDBStateFactory
implements Closeable {
    public static final String MERGE_OPERATOR_NAME = "stringappendtest";
    private final Options options;
    private final String path;
    private final ColumnFamilyOptions columnFamilyOptions;
    private RocksDB db;
    private int sstIndex = 0;

    public RocksDBStateFactory(String path, org.apache.paimon.options.Options conf, @Nullable Duration ttlSecs) throws IOException {
        DBOptions dbOptions = RocksDBOptions.createDBOptions(new DBOptions().setUseFsync(false).setStatsDumpPeriodSec(0).setCreateIfMissing(true), conf);
        this.path = path;
        this.columnFamilyOptions = RocksDBOptions.createColumnOptions(new ColumnFamilyOptions(), conf).setMergeOperatorName(MERGE_OPERATOR_NAME);
        this.options = new Options(dbOptions, this.columnFamilyOptions);
        try {
            this.db = ttlSecs == null ? RocksDB.open((Options)this.options, (String)path) : TtlDB.open((Options)this.options, (String)path, (int)((int)ttlSecs.getSeconds()), (boolean)false);
        }
        catch (RocksDBException e) {
            throw new IOException("Error while opening RocksDB instance.", e);
        }
    }

    public void bulkLoad(RocksDBState<?, ?, ?> state, KeyValueIterator<byte[], byte[]> iterator) throws IOException, RocksDBException {
        long targetFileSize = this.options.targetFileSizeBase();
        ArrayList<String> files = new ArrayList<String>();
        SstFileWriter writer = null;
        long recordNum = 0L;
        while (iterator.advanceNext()) {
            byte[] key = (byte[])iterator.getKey();
            byte[] value = (byte[])iterator.getValue();
            if (writer == null) {
                writer = new SstFileWriter(new EnvOptions(), this.options);
                String path = new File(this.path, "sst-" + this.sstIndex++).getPath();
                writer.open(path);
                files.add(path);
            }
            try {
                writer.put(key, value);
            }
            catch (RocksDBException e) {
                throw new RuntimeException("Exception in bulkLoad, the most suspicious reason is that your data contains duplicates, please check your sink table. (The likelihood of duplication is that you used multiple jobs to write the same dynamic bucket table, it only supports single write)", e);
            }
            if (++recordNum % 1000L != 0L || writer.fileSize() < targetFileSize) continue;
            writer.finish();
            writer = null;
            recordNum = 0L;
        }
        if (writer != null) {
            writer.finish();
        }
        if (files.size() > 0) {
            this.db.ingestExternalFile(state.columnFamily, files, new IngestExternalFileOptions());
        }
    }

    public <K, V> RocksDBValueState<K, V> valueState(String name, Serializer<K> keySerializer, Serializer<V> valueSerializer, long lruCacheSize) throws IOException {
        return new RocksDBValueState<K, V>(this.db, this.createColumnFamily(name), keySerializer, valueSerializer, lruCacheSize);
    }

    public <K, V> RocksDBSetState<K, V> setState(String name, Serializer<K> keySerializer, Serializer<V> valueSerializer, long lruCacheSize) throws IOException {
        return new RocksDBSetState<K, V>(this.db, this.createColumnFamily(name), keySerializer, valueSerializer, lruCacheSize);
    }

    public <K, V> RocksDBListState<K, V> listState(String name, Serializer<K> keySerializer, Serializer<V> valueSerializer, long lruCacheSize) throws IOException {
        return new RocksDBListState<K, V>(this.db, this.createColumnFamily(name), keySerializer, valueSerializer, lruCacheSize);
    }

    private ColumnFamilyHandle createColumnFamily(String name) throws IOException {
        try {
            return this.db.createColumnFamily(new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.UTF_8), this.columnFamilyOptions));
        }
        catch (RocksDBException e) {
            throw new IOException(e);
        }
    }

    @Override
    public void close() throws IOException {
        if (this.db != null) {
            this.db.close();
            this.db = null;
        }
    }
}

