/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.jdbc;

import java.io.IOException;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.jdbc.JdbcCatalogLoader;
import org.apache.paimon.jdbc.JdbcCatalogLock;
import org.apache.paimon.jdbc.JdbcCatalogLockContext;
import org.apache.paimon.jdbc.JdbcCatalogLockFactory;
import org.apache.paimon.jdbc.JdbcClientPool;
import org.apache.paimon.jdbc.JdbcUtils;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
import org.apache.paimon.shade.guava30.com.google.common.collect.Sets;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcCatalog
extends AbstractCatalog {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcCatalog.class);
    public static final String PROPERTY_PREFIX = "jdbc.";
    private static final String DATABASE_EXISTS_PROPERTY = "exists";
    private final JdbcClientPool connections;
    private final String catalogKey;
    private final Options options;
    private final String warehouse;

    protected JdbcCatalog(FileIO fileIO, String catalogKey, Options options, String warehouse) {
        super(fileIO, options);
        this.catalogKey = catalogKey;
        this.options = options;
        this.warehouse = warehouse;
        Preconditions.checkNotNull((Object)options, (String)"Invalid catalog properties: null");
        this.connections = new JdbcClientPool((Integer)options.get(CatalogOptions.CLIENT_POOL_SIZE), options.get(CatalogOptions.URI.key()), options.toMap());
        try {
            this.initializeCatalogTablesIfNeed();
        }
        catch (SQLException e) {
            throw new RuntimeException("Cannot initialize JDBC catalog", e);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted in call to initialize", e);
        }
    }

    @VisibleForTesting
    public JdbcClientPool getConnections() {
        return this.connections;
    }

    private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedException {
        String uri = this.options.get(CatalogOptions.URI.key());
        Preconditions.checkNotNull((Object)uri, (String)"JDBC connection URI is required");
        this.connections.run(conn -> {
            DatabaseMetaData dbMeta = conn.getMetaData();
            ResultSet tableExists = dbMeta.getTables(null, null, "paimon_tables", null);
            if (tableExists.next()) {
                return true;
            }
            return conn.prepareStatement("CREATE TABLE paimon_tables(catalog_key VARCHAR(255) NOT NULL,database_name VARCHAR(255) NOT NULL,table_name VARCHAR(255) NOT NULL, PRIMARY KEY (catalog_key, database_name, table_name))").execute();
        });
        this.connections.run(conn -> {
            DatabaseMetaData dbMeta = conn.getMetaData();
            ResultSet tableExists = dbMeta.getTables(null, null, "paimon_database_properties", null);
            if (tableExists.next()) {
                return true;
            }
            return conn.prepareStatement("CREATE TABLE paimon_database_properties(catalog_key VARCHAR(255) NOT NULL,database_name VARCHAR(255) NOT NULL,property_key VARCHAR(255),property_value VARCHAR(1000),PRIMARY KEY (catalog_key, database_name, property_key))").execute();
        });
        if (this.lockEnabled()) {
            JdbcUtils.createDistributedLockTable(this.connections, this.options);
        }
    }

    @Override
    public String warehouse() {
        return this.warehouse;
    }

    @Override
    public CatalogLoader catalogLoader() {
        return new JdbcCatalogLoader(this.fileIO, this.catalogKey, this.options, this.warehouse);
    }

    @Override
    public List<String> listDatabases() {
        ArrayList databases = Lists.newArrayList();
        databases.addAll(this.fetch(row -> row.getString("database_name"), "SELECT DISTINCT database_name FROM paimon_tables WHERE catalog_key = ?", this.catalogKey));
        databases.addAll(this.fetch(row -> row.getString("database_name"), "SELECT DISTINCT database_name FROM paimon_database_properties WHERE catalog_key = ?", this.catalogKey));
        return databases.stream().distinct().collect(Collectors.toList());
    }

    @Override
    protected Database getDatabaseImpl(String databaseName) throws Catalog.DatabaseNotExistException {
        if (!JdbcUtils.databaseExists(this.connections, this.catalogKey, databaseName)) {
            throw new Catalog.DatabaseNotExistException(databaseName);
        }
        HashMap options = Maps.newHashMap();
        options.putAll(this.fetchProperties(databaseName));
        if (!options.containsKey("location")) {
            options.put("location", this.newDatabasePath(databaseName).getName());
        }
        options.remove(DATABASE_EXISTS_PROPERTY);
        return Database.of(databaseName, options, null);
    }

    @Override
    protected void createDatabaseImpl(String name, Map<String, String> properties) {
        HashMap<String, String> createProps = new HashMap<String, String>();
        createProps.put(DATABASE_EXISTS_PROPERTY, "true");
        if (properties != null && !properties.isEmpty()) {
            createProps.putAll(properties);
        }
        if (!createProps.containsKey("location")) {
            Path databasePath = this.newDatabasePath(name);
            createProps.put("location", databasePath.toString());
        }
        JdbcUtils.insertProperties(this.connections, this.catalogKey, name, createProps);
    }

    @Override
    protected void dropDatabaseImpl(String name) {
        JdbcUtils.execute(this.connections, "DELETE FROM  paimon_tables WHERE catalog_key = ? AND database_name = ?", this.catalogKey, name);
        JdbcUtils.execute(this.connections, "DELETE FROM paimon_database_properties WHERE catalog_key = ? AND database_name = ?", this.catalogKey, name);
    }

    @Override
    protected void alterDatabaseImpl(String name, List<PropertyChange> changes) {
        Pair<Map<String, String>, Set<String>> setPropertiesToRemoveKeys = PropertyChange.getSetPropertiesToRemoveKeys(changes);
        Map setProperties = (Map)setPropertiesToRemoveKeys.getLeft();
        Set removeKeys = (Set)setPropertiesToRemoveKeys.getRight();
        Map<String, String> startingProperties = this.fetchProperties(name);
        HashMap inserts = Maps.newHashMap();
        HashMap updates = Maps.newHashMap();
        HashSet removes = Sets.newHashSet();
        if (!setProperties.isEmpty()) {
            setProperties.forEach((k, v) -> {
                if (!startingProperties.containsKey(k)) {
                    inserts.put(k, v);
                } else {
                    updates.put(k, v);
                }
            });
        }
        if (!removeKeys.isEmpty()) {
            removeKeys.forEach(k -> {
                if (startingProperties.containsKey(k)) {
                    removes.add(k);
                }
            });
        }
        if (!inserts.isEmpty()) {
            JdbcUtils.insertProperties(this.connections, this.catalogKey, name, inserts);
        }
        if (!updates.isEmpty()) {
            JdbcUtils.updateProperties(this.connections, this.catalogKey, name, updates);
        }
        if (!removes.isEmpty()) {
            JdbcUtils.deleteProperties(this.connections, this.catalogKey, name, removes);
        }
    }

    @Override
    protected List<String> listTablesImpl(String databaseName) {
        return this.fetch(row -> row.getString("table_name"), "SELECT * FROM paimon_tables WHERE catalog_key = ? AND database_name = ?", this.catalogKey, databaseName);
    }

    @Override
    protected void dropTableImpl(Identifier identifier, List<Path> externalPaths) {
        try {
            int deletedRecords = JdbcUtils.execute(this.connections, "DELETE FROM paimon_tables WHERE catalog_key = ? AND database_name = ? AND table_name = ? ", this.catalogKey, identifier.getDatabaseName(), identifier.getTableName());
            if (deletedRecords == 0) {
                LOG.info("Skipping drop, table does not exist: {}", (Object)identifier);
                return;
            }
            Path path = this.getTableLocation(identifier);
            try {
                if (this.fileIO.exists(path)) {
                    this.fileIO.deleteDirectoryQuietly(path);
                }
                for (Path externalPath : externalPaths) {
                    if (!this.fileIO.exists(externalPath)) continue;
                    this.fileIO.deleteDirectoryQuietly(externalPath);
                }
            }
            catch (Exception ex) {
                LOG.error("Delete directory[{}] fail for table {}", new Object[]{path, identifier, ex});
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to drop table " + identifier.getFullName(), e);
        }
    }

    @Override
    protected void createTableImpl(Identifier identifier, Schema schema) {
        try {
            SchemaManager schemaManager = this.getSchemaManager(identifier);
            this.runWithLock(identifier, () -> schemaManager.createTable(schema));
            Path path = this.getTableLocation(identifier);
            int insertRecord = (Integer)this.connections.run(conn -> {
                try (PreparedStatement sql = conn.prepareStatement("INSERT INTO paimon_tables (catalog_key, database_name, table_name)  VALUES (?,?,?)");){
                    sql.setString(1, this.catalogKey);
                    sql.setString(2, identifier.getDatabaseName());
                    sql.setString(3, identifier.getTableName());
                    Integer n = sql.executeUpdate();
                    return n;
                }
            });
            if (insertRecord != 1) {
                try {
                    this.fileIO.deleteDirectoryQuietly(path);
                }
                catch (Exception ee) {
                    LOG.error("Delete directory[{}] fail for table {}", new Object[]{path, identifier, ee});
                }
                throw new RuntimeException(String.format("Failed to create table %s in catalog %s", identifier.getFullName(), this.catalogKey));
            }
            LOG.debug("Successfully committed to new table: {}", (Object)identifier);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create table " + identifier.getFullName(), e);
        }
    }

    @Override
    protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
        block4: {
            try {
                JdbcUtils.updateTable(this.connections, this.catalogKey, fromTable, toTable);
                Path fromPath = this.getTableLocation(fromTable);
                if (new SchemaManager(this.fileIO, fromPath).listAllIds().isEmpty()) break block4;
                Path toPath = this.getTableLocation(toTable);
                try {
                    this.fileIO.rename(fromPath, toPath);
                }
                catch (IOException e) {
                    throw new RuntimeException("Failed to rename changes of table " + toTable.getFullName() + " to underlying files.", e);
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to rename table " + fromTable.getFullName(), e);
            }
        }
    }

    @Override
    protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes) throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException, Catalog.ColumnNotExistException {
        this.assertMainBranch(identifier);
        SchemaManager schemaManager = this.getSchemaManager(identifier);
        try {
            this.runWithLock(identifier, () -> schemaManager.commitChanges(changes));
        }
        catch (RuntimeException | Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException | Catalog.TableNotExistException e) {
            throw e;
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to alter table " + identifier.getFullName(), e);
        }
    }

    @Override
    protected TableSchema loadTableSchema(Identifier identifier) throws Catalog.TableNotExistException {
        this.assertMainBranch(identifier);
        if (!JdbcUtils.tableExists(this.connections, this.catalogKey, identifier.getDatabaseName(), identifier.getTableName())) {
            throw new Catalog.TableNotExistException(identifier);
        }
        Path tableLocation = this.getTableLocation(identifier);
        return this.tableSchemaInFileSystem(tableLocation, identifier.getBranchNameOrDefault()).orElseThrow(() -> new RuntimeException("There is no paimon table in " + tableLocation));
    }

    @Override
    public boolean caseSensitive() {
        return false;
    }

    @Override
    public Optional<CatalogLockFactory> defaultLockFactory() {
        return Optional.of(new JdbcCatalogLockFactory());
    }

    @Override
    public Optional<CatalogLockContext> lockContext() {
        return Optional.of(new JdbcCatalogLockContext(this.catalogKey, this.options));
    }

    public <T> T runWithLock(Identifier identifier, Callable<T> callable) throws Exception {
        if (!this.lockEnabled()) {
            return callable.call();
        }
        JdbcCatalogLock lock = new JdbcCatalogLock(this.connections, this.catalogKey, JdbcCatalogLock.checkMaxSleep(this.options.toMap()), JdbcCatalogLock.acquireTimeout(this.options.toMap()));
        return Lock.fromCatalog(lock, identifier).runWithLock(callable);
    }

    @Override
    public void close() throws Exception {
        this.connections.close();
    }

    private SchemaManager getSchemaManager(Identifier identifier) {
        return new SchemaManager(this.fileIO, this.getTableLocation(identifier));
    }

    private Map<String, String> fetchProperties(String databaseName) {
        List<Map.Entry> entries = this.fetch(row -> new AbstractMap.SimpleImmutableEntry<String, String>(row.getString("property_key"), row.getString("property_value")), "SELECT *  FROM paimon_database_properties WHERE catalog_key = ? AND database_name = ? ", this.catalogKey, databaseName);
        return ImmutableMap.builder().putAll(entries).build();
    }

    private <R> List<R> fetch(RowProducer<R> toRow, String sql, String ... args2) {
        try {
            return (List)this.connections.run(conn -> {
                ArrayList result = Lists.newArrayList();
                try (PreparedStatement preparedStatement = conn.prepareStatement(sql);){
                    for (int pos = 0; pos < args2.length; ++pos) {
                        preparedStatement.setString(pos + 1, args2[pos]);
                    }
                    try (ResultSet rs = preparedStatement.executeQuery();){
                        while (rs.next()) {
                            result.add(toRow.apply(rs));
                        }
                    }
                }
                return result;
            });
        }
        catch (SQLException e) {
            throw new RuntimeException(String.format("Failed to execute query: %s", sql), e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted in SQL query", e);
        }
    }

    @FunctionalInterface
    static interface RowProducer<R> {
        public R apply(ResultSet var1) throws SQLException;
    }
}

