package org.locationtech.geomesa.gt.partition.postgis.dialect.procedures;

import org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage;
import org.locationtech.geomesa.gt.partition.postgis.dialect.package$;
import org.locationtech.geomesa.gt.partition.postgis.dialect.package$FunctionName$;
import org.locationtech.geomesa.gt.partition.postgis.dialect.tables.PartitionTablespacesTable$;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;

/* compiled from: MergeWriteAheadPartitions.scala */
/* loaded from: input_file:org/locationtech/geomesa/gt/partition/postgis/dialect/procedures/MergeWriteAheadPartitions$.class */
public final class MergeWriteAheadPartitions$ implements Cpackage.SqlProcedure {
    public static MergeWriteAheadPartitions$ MODULE$;

    static {
        new MergeWriteAheadPartitions$();
    }

    @Override // org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.SqlProcedure, org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.SqlStatements, org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.CronSchedule
    public Seq<String> dropStatements(Cpackage.TypeInfo typeInfo) {
        Seq<String> dropStatements;
        dropStatements = dropStatements(typeInfo);
        return dropStatements;
    }

    @Override // org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.SqlStatements, org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.Sql
    public void create(Cpackage.TypeInfo typeInfo, Cpackage.ExecutionContext executionContext) {
        Cpackage.SqlStatements.create$(this, typeInfo, executionContext);
    }

    @Override // org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.SqlStatements, org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.Sql
    public void drop(Cpackage.TypeInfo typeInfo, Cpackage.ExecutionContext executionContext) {
        Cpackage.SqlStatements.drop$(this, typeInfo, executionContext);
    }

    @Override // org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.SqlProcedure
    public Cpackage.FunctionName name(Cpackage.TypeInfo typeInfo) {
        return package$FunctionName$.MODULE$.apply(new StringBuilder(20).append(typeInfo.typeName()).append("_merge_wa_partitions").toString());
    }

    @Override // org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.SqlStatements, org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.CronSchedule
    public Seq<String> createStatements(Cpackage.TypeInfo typeInfo) {
        return new $colon.colon<>(proc(typeInfo), Nil$.MODULE$);
    }

    private String proc(Cpackage.TypeInfo typeInfo) {
        int hoursPerPartition = typeInfo.partitions().hoursPerPartition();
        Cpackage.TableConfig writeAheadPartitions = typeInfo.tables().writeAheadPartitions();
        Cpackage.TableConfig mainPartitions = typeInfo.tables().mainPartitions();
        String sb = new StringBuilder(1).append(typeInfo.schema().quoted()).append(".").append(PartitionTablespacesTable$.MODULE$.Name().quoted()).toString();
        String quoted = typeInfo.cols().dtg().quoted();
        return new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(7210).append("CREATE OR REPLACE PROCEDURE ").append(name(typeInfo).quoted()).append("(cur_time timestamp without time zone) LANGUAGE plpgsql AS\n       |  $BODY$\n       |    DECLARE\n       |      min_dtg timestamp without time zone;         -- min date in our partitioned tables\n       |      main_cutoff timestamp without time zone;     -- max age of the records for main tables\n       |      partition_start timestamp without time zone; -- start bounds for the partition we're writing\n       |      partition_end timestamp without time zone;   -- end bounds for the partition we're writing\n       |      partition_name text;                         -- partition table name\n       |      partition_tablespace text;                   -- partition tablespace\n       |      write_ahead_partitions text[];               -- names of the partitions we're migrating\n       |      write_ahead_partition text;                  -- name of current partition\n       |      pexists boolean;                             -- table exists check\n       |      unsorted_count bigint;\n       |    BEGIN\n       |      -- constants\n       |      main_cutoff := truncate_to_partition(cur_time, ").append(hoursPerPartition).append(") - INTERVAL '").append(hoursPerPartition).append(" HOURS';\n       |\n       |      -- move data from the write ahead partitions to the main partitions\n       |      LOOP\n       |        -- find the range of dates in the write ahead partition tables\n       |        SELECT min(").append(quoted).append(") INTO min_dtg FROM ").append(writeAheadPartitions.name().qualified()).append("\n       |          WHERE ").append(quoted).append(" < main_cutoff;\n       |        EXIT WHEN min_dtg IS NULL;\n       |\n       |        partition_start := truncate_to_partition(min_dtg, ").append(hoursPerPartition).append(");\n       |        partition_end := partition_start + INTERVAL '").append(hoursPerPartition).append(" HOURS';\n       |        partition_name := ").append(mainPartitions.name().asLiteral()).append(" || '_' || to_char(partition_start, 'YYYY_MM_DD_HH24');\n       |\n       |        SELECT EXISTS(SELECT FROM pg_tables WHERE schemaname = ").append(typeInfo.schema().asLiteral()).append(" AND tablename = partition_name)\n       |          INTO pexists;\n       |\n       |        -- create the partition table if it doesn't exist\n       |        IF NOT pexists THEN\n       |          SELECT table_space INTO partition_tablespace FROM ").append(sb).append("\n       |            WHERE type_name = ").append(package$.MODULE$.literal(typeInfo.typeName())).append(" AND table_type = ").append(package$.MODULE$.PartitionedTableSuffix().quoted()).append(";\n       |          IF partition_tablespace IS NULL THEN\n       |            partition_tablespace := '';\n       |          ELSE\n       |            partition_tablespace := ' TABLESPACE ' || quote_ident(partition_tablespace);\n       |          END IF;\n       |          -- upper bounds are exclusive\n       |          -- this won't have any indices until we attach it to the parent partition table\n       |          EXECUTE 'CREATE TABLE ").append(typeInfo.schema().quoted()).append(".' || quote_ident(partition_name) ||\n       |            ' (LIKE ").append(mainPartitions.name().qualified()).append(" INCLUDING DEFAULTS INCLUDING CONSTRAINTS)' ||\n       |            partition_tablespace;\n       |          -- creating a constraint allows it to be attached to the parent without any additional checks\n       |          EXECUTE 'ALTER TABLE  ").append(typeInfo.schema().quoted()).append(".' || quote_ident(partition_name) ||\n       |            ' ADD CONSTRAINT ' || quote_ident(partition_name || '_constraint') ||\n       |            ' CHECK ( ").append(quoted).append(" >= ' || quote_literal(partition_start) ||\n       |            ' AND ").append(quoted).append(" < ' || quote_literal(partition_end) || ' );';\n       |        END IF;\n       |\n       |        -- find the write ahead partitions we're copying from\n       |        -- order the results to ensure we get locks in a consistent order to avoid deadlocks\n       |        write_ahead_partitions := Array(\n       |          SELECT '").append(typeInfo.schema().quoted()).append(".' || quote_ident(pg_class.relname)\n       |            FROM pg_catalog.pg_inherits\n       |            INNER JOIN pg_catalog.pg_class ON (pg_inherits.inhrelid = pg_class.oid)\n       |            INNER JOIN pg_catalog.pg_namespace ON (pg_class.relnamespace = pg_namespace.oid)\n       |            WHERE inhparent = ").append(writeAheadPartitions.name().asRegclass()).append("\n       |              AND pg_class.relname >= ").append(writeAheadPartitions.name().asLiteral()).append(" || '_' || to_char(partition_start, 'YYYY_MM_DD_HH24_MI')\n       |              AND pg_class.relname < ").append(writeAheadPartitions.name().asLiteral()).append(" || '_' || to_char(partition_end, 'YYYY_MM_DD_HH24_MI')\n       |            ORDER BY 1\n       |          );\n       |\n       |        -- get a lock on the tables - this mode won't prevent reads but will prevent writes\n       |        -- (there shouldn't be any writes though) and will synchronize this method\n       |        -- TODO we really just need to sync this method for safety in manual invocations\n       |        -- FOREACH write_ahead_partition IN ARRAY write_ahead_partitions LOOP\n       |        --   EXECUTE 'LOCK TABLE ' || write_ahead_partition || ' IN SHARE ROW EXCLUSIVE MODE';\n       |        --   RAISE INFO '% Locked write ahead partition % for migration', timeofday()::timestamp, write_ahead_partition;\n       |        -- END LOOP;\n       |\n       |        -- create a view from the tables so that we can sort the result by an expression (geohash)\n       |        EXECUTE 'CREATE TEMP VIEW ' || quote_ident(partition_name || '_tmp_migrate') ||\n       |          ' AS SELECT * FROM ' || array_to_string(write_ahead_partitions, ' UNION ALL SELECT * FROM ');\n       |\n       |        -- copy rows from write ahead partitions to main partition table\n       |        EXECUTE 'INSERT INTO ").append(typeInfo.schema().quoted()).append(".' || quote_ident(partition_name) ||\n       |          ' SELECT * FROM ' || quote_ident(partition_name || '_tmp_migrate') ||\n       |          '   ORDER BY st_geohash(").append(typeInfo.cols().geom().quoted()).append("), ").append(quoted).append("' ||\n       |          '   ON CONFLICT DO NOTHING';\n       |\n       |        IF NOT pexists THEN\n       |          EXECUTE 'ALTER TABLE ").append(mainPartitions.name().qualified()).append("' ||\n       |            ' ATTACH PARTITION ").append(typeInfo.schema().quoted()).append(".' || quote_ident(partition_name) ||\n       |            ' FOR VALUES FROM (' || quote_literal(partition_start) ||\n       |            ') TO (' || quote_literal(partition_end) || ' );';\n       |          -- now that we've attached the table we can drop the redundant constraint\n       |          EXECUTE 'ALTER TABLE ").append(typeInfo.schema().quoted()).append(".' || quote_ident(partition_name) ||\n       |            ' DROP CONSTRAINT ' || quote_ident(partition_name || '_constraint');\n       |          RAISE NOTICE 'A partition has been created %', partition_name;\n       |        ELSE\n       |          -- store record of unsorted row counts which could negatively impact BRIN index scans\n       |          GET DIAGNOSTICS unsorted_count := ROW_COUNT;\n       |          INSERT INTO ").append(typeInfo.tables().sortQueue().name().qualified()).append("(partition_name, unsorted_count, enqueued)\n       |            VALUES (partition_name, unsorted_count, now());\n       |          RAISE NOTICE 'Inserting % rows into existing partition %, queries may be impacted',\n       |                unsorted_count, partition_name;\n       |        END IF;\n       |\n       |        -- drop the tables that we've copied out\n       |        EXECUTE 'DROP VIEW ' || quote_ident(partition_name || '_tmp_migrate');\n       |        FOREACH write_ahead_partition IN ARRAY write_ahead_partitions LOOP\n       |          EXECUTE 'DROP TABLE ' || write_ahead_partition;\n       |          RAISE NOTICE 'A partition has been deleted %', write_ahead_partition;\n       |        END LOOP;\n       |\n       |        -- mark the partition to be analyzed in a separate thread\n       |        INSERT INTO ").append(typeInfo.tables().analyzeQueue().name().qualified()).append("(partition_name, enqueued)\n       |          VALUES (partition_name, now());\n       |\n       |        -- commit after each move, also releases the table locks\n       |        COMMIT;\n       |\n       |      END LOOP;\n       |    END;\n       |  $BODY$;\n       |").toString())).stripMargin();
    }

    private MergeWriteAheadPartitions$() {
        MODULE$ = this;
        Cpackage.SqlStatements.$init$(this);
        Cpackage.SqlProcedure.$init$((Cpackage.SqlProcedure) this);
    }
}
