package org.locationtech.geomesa.gt.partition.postgis.dialect.procedures;

import org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage;
import org.locationtech.geomesa.gt.partition.postgis.dialect.package$;
import org.locationtech.geomesa.gt.partition.postgis.dialect.package$FunctionName$;
import org.locationtech.geomesa.gt.partition.postgis.dialect.tables.PartitionTablespacesTable$;
import org.locationtech.geomesa.gt.partition.postgis.dialect.tables.WriteAheadTable$;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;

/* compiled from: PartitionWriteAheadLog.scala */
/* loaded from: input_file:org/locationtech/geomesa/gt/partition/postgis/dialect/procedures/PartitionWriteAheadLog$.class */
public final class PartitionWriteAheadLog$ implements Cpackage.SqlProcedure {
    public static PartitionWriteAheadLog$ MODULE$;

    static {
        new PartitionWriteAheadLog$();
    }

    @Override // org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.SqlProcedure, org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.SqlStatements, org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.CronSchedule
    public Seq<String> dropStatements(Cpackage.TypeInfo typeInfo) {
        Seq<String> dropStatements;
        dropStatements = dropStatements(typeInfo);
        return dropStatements;
    }

    @Override // org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.SqlStatements, org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.Sql
    public void create(Cpackage.TypeInfo typeInfo, Cpackage.ExecutionContext executionContext) {
        Cpackage.SqlStatements.create$(this, typeInfo, executionContext);
    }

    @Override // org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.SqlStatements, org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.Sql
    public void drop(Cpackage.TypeInfo typeInfo, Cpackage.ExecutionContext executionContext) {
        Cpackage.SqlStatements.drop$(this, typeInfo, executionContext);
    }

    @Override // org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.SqlProcedure
    public Cpackage.FunctionName name(Cpackage.TypeInfo typeInfo) {
        return package$FunctionName$.MODULE$.apply(new StringBuilder(13).append(typeInfo.typeName()).append("_partition_wa").toString());
    }

    @Override // org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.SqlStatements, org.locationtech.geomesa.gt.partition.postgis.dialect.Cpackage.CronSchedule
    public Seq<String> createStatements(Cpackage.TypeInfo typeInfo) {
        return new $colon.colon<>(proc(typeInfo), Nil$.MODULE$);
    }

    private String proc(Cpackage.TypeInfo typeInfo) {
        int hoursPerPartition = typeInfo.partitions().hoursPerPartition();
        Cpackage.TableConfig writeAhead = typeInfo.tables().writeAhead();
        Cpackage.TableConfig writeAheadPartitions = typeInfo.tables().writeAheadPartitions();
        Cpackage.TableConfig mainPartitions = typeInfo.tables().mainPartitions();
        String sb = new StringBuilder(1).append(typeInfo.schema().quoted()).append(".").append(PartitionTablespacesTable$.MODULE$.Name().quoted()).toString();
        String quoted = typeInfo.cols().dtg().quoted();
        return new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(9462).append("CREATE OR REPLACE PROCEDURE ").append(name(typeInfo).quoted()).append("(cur_time timestamp without time zone) LANGUAGE plpgsql AS\n       |  $BODY$\n       |    DECLARE\n       |      min_dtg timestamp without time zone;         -- min date in our partitioned tables\n       |      main_cutoff timestamp without time zone;     -- max age of the records for main tables\n       |      write_ahead record;\n       |      partition_start timestamp without time zone; -- start bounds for the partition we're writing\n       |      partition_end timestamp without time zone;   -- end bounds for the partition we're writing\n       |      partition_name text;                         -- partition table name\n       |      partition_parent text;                       -- partition parent table name\n       |      partition_name_format text;                  -- date format for partition names\n       |      partition_tablespace text;                   -- partition tablespace\n       |      pexists boolean;                             -- table exists check\n       |      unsorted_count bigint;\n       |    BEGIN\n       |      -- constants\n       |      main_cutoff := truncate_to_partition(cur_time, ").append(hoursPerPartition).append(") - INTERVAL '").append(hoursPerPartition).append(" HOURS';\n       |\n       |      -- check for write ahead partitions and move the data into the time partitioned tables\n       |      FOR write_ahead IN\n       |        SELECT pg_class.relname AS name\n       |          FROM pg_catalog.pg_inherits\n       |          INNER JOIN pg_catalog.pg_class ON (pg_inherits.inhrelid = pg_class.oid)\n       |          INNER JOIN pg_catalog.pg_namespace ON (pg_class.relnamespace = pg_namespace.oid)\n       |          WHERE inhparent = ").append(writeAhead.name().asRegclass()).append("\n       |          AND relname != ").append(WriteAheadTable$.MODULE$.writesPartition(typeInfo).asLiteral()).append("\n       |          ORDER BY name\n       |      LOOP\n       |\n       |        RAISE INFO '% Checking write ahead table %', timeofday()::timestamp, write_ahead.name;\n       |        -- get a lock on the table - this mode won't prevent reads but will prevent writes\n       |        -- (there shouldn't be any writes though) and will synchronize this method\n       |        LOCK TABLE ONLY ").append(mainPartitions.name().qualified()).append(" IN SHARE UPDATE EXCLUSIVE MODE;\n       |        EXECUTE 'LOCK TABLE ").append(typeInfo.schema().quoted()).append(".' || quote_ident(write_ahead.name) ||\n       |          ' IN SHARE UPDATE EXCLUSIVE MODE';\n       |        RAISE INFO '% Locked write ahead table % for migration', timeofday()::timestamp, write_ahead.name;\n       |\n       |        -- wait until the table doesn't contain any recent records\n       |        EXECUTE 'SELECT EXISTS(SELECT 1 FROM ").append(typeInfo.schema().quoted()).append(".' || quote_ident(write_ahead.name) ||\n       |          ' WHERE ").append(quoted).append(" >= ' || quote_literal(truncate_to_ten_minutes(cur_time)) || ')'\n       |          INTO pexists;\n       |\n       |        IF pexists THEN\n       |          -- should only happen if data is inserted with timestamps from the future\n       |          RAISE NOTICE '% Skipping write ahead table % due to min date', timeofday()::timestamp, write_ahead.name;\n       |        ELSE\n       |          partition_end := '-infinity'::timestamp without time zone;\n       |          LOOP\n       |            -- find the range of dates in the write ahead partition\n       |            EXECUTE 'SELECT min(").append(quoted).append(") FROM ").append(typeInfo.schema().quoted()).append(".' || quote_ident(write_ahead.name) ||\n       |              ' WHERE ").append(quoted).append(" >= ' || quote_literal(partition_end) INTO min_dtg;\n       |            EXIT WHEN min_dtg IS NULL;\n       |\n       |            -- calculate the partition bounds for the min date\n       |            IF min_dtg < main_cutoff THEN\n       |              partition_start := truncate_to_partition(min_dtg, ").append(hoursPerPartition).append(");\n       |              partition_end := partition_start + INTERVAL '").append(hoursPerPartition).append(" HOURS';\n       |              partition_parent := ").append(mainPartitions.name().asLiteral()).append(";\n       |              partition_name_format := 'YYYY_MM_DD_HH24';\n       |              SELECT table_space INTO partition_tablespace FROM ").append(sb).append("\n       |                WHERE type_name = ").append(package$.MODULE$.literal(typeInfo.typeName())).append(" AND table_type = ").append(package$.MODULE$.PartitionedTableSuffix().quoted()).append(";\n       |              IF partition_tablespace IS NULL THEN\n       |                partition_tablespace := '").append(mainPartitions.storage().opts()).append("';\n       |              ELSE\n       |                partition_tablespace := '").append(mainPartitions.storage().opts()).append(" TABLESPACE ' ||\n       |                  quote_ident(partition_tablespace);\n       |              END IF;\n       |            ELSE\n       |              partition_start := truncate_to_ten_minutes(min_dtg);\n       |              partition_end := partition_start + INTERVAL '10 MINUTES';\n       |              partition_parent := ").append(writeAheadPartitions.name().asLiteral()).append(";\n       |              partition_name_format := 'YYYY_MM_DD_HH24_MI';\n       |              SELECT table_space INTO partition_tablespace FROM ").append(sb).append("\n       |                WHERE type_name = ").append(package$.MODULE$.literal(typeInfo.typeName())).append(" AND table_type = ").append(package$.MODULE$.PartitionedWriteAheadTableSuffix().quoted()).append(";\n       |              IF partition_tablespace IS NULL THEN\n       |                partition_tablespace := '").append(writeAheadPartitions.storage().opts()).append("';\n       |              ELSE\n       |                partition_tablespace := '").append(writeAheadPartitions.storage().opts()).append(" TABLESPACE ' ||\n       |                  quote_ident(partition_tablespace);\n       |              END IF;\n       |            END IF;\n       |\n       |            partition_name := partition_parent || '_' || to_char(partition_start, partition_name_format);\n       |\n       |            RAISE INFO '% Writing to partition %', timeofday()::timestamp, partition_name;\n       |\n       |            SELECT EXISTS(SELECT FROM pg_tables WHERE schemaname = ").append(typeInfo.schema().asLiteral()).append(" AND tablename = partition_name)\n       |              INTO pexists;\n       |\n       |            -- create the partition table if it doesn't exist\n       |            -- note: normally the partition will not exist unless time-latent data was inserted\n       |            -- we create it unattached so that it doesn't lock the _recent table on insert\n       |            -- then we attach it after inserting the rows\n       |            -- since this is all within a transaction it should all happen \"at once\"\n       |            -- see https://www.postgresql.org/docs/13/ddl-partitioning.html#DDL-PARTITIONING-DECLARATIVE-MAINTENANCE\n       |            IF NOT pexists THEN\n       |              RAISE INFO '% Creating partition % (unattached)', timeofday()::timestamp, partition_name;\n       |              -- upper bounds are exclusive\n       |              EXECUTE 'CREATE TABLE ").append(typeInfo.schema().quoted()).append(".' || quote_ident(partition_name) ||\n       |                ' (LIKE ").append(typeInfo.schema().quoted()).append(".' || quote_ident(partition_parent) ||\n       |                ' INCLUDING DEFAULTS INCLUDING CONSTRAINTS)' || partition_tablespace;\n       |              -- creating a constraint allows it to be attached to the parent without any additional checks\n       |              EXECUTE 'ALTER TABLE  ").append(typeInfo.schema().quoted()).append(".' || quote_ident(partition_name) ||\n       |                ' ADD CONSTRAINT ' || quote_ident(partition_name || '_constraint') ||\n       |                ' CHECK ( ").append(quoted).append(" >= ' || quote_literal(partition_start) ||\n       |                ' AND ").append(quoted).append(" < ' || quote_literal(partition_end) || ' );';\n       |            END IF;\n       |\n       |            -- copy rows from write ahead table to partition table\n       |            RAISE INFO '% Copying rows to partition %', timeofday()::timestamp, partition_name;\n       |            EXECUTE 'INSERT INTO ").append(typeInfo.schema().quoted()).append(".' || quote_ident(partition_name) ||\n       |              ' SELECT * FROM ' || quote_ident(write_ahead.name) ||\n       |              '   WHERE ").append(quoted).append(" >= ' || quote_literal(partition_start) ||\n       |              '     AND ").append(quoted).append(" < ' || quote_literal(partition_end) ||\n       |              '   ORDER BY st_geohash(").append(typeInfo.cols().geom().quoted()).append("), ").append(quoted).append("' ||\n       |              '   ON CONFLICT DO NOTHING';\n       |            RAISE INFO '% Done copying rows to partition %', timeofday()::timestamp, partition_name;\n       |\n       |            -- attach the partition table to the parent\n       |            IF NOT pexists THEN\n       |              RAISE INFO '% Attaching partition % to parent', timeofday()::timestamp, partition_name;\n       |              EXECUTE 'ALTER TABLE ").append(typeInfo.schema().quoted()).append(".' || quote_ident(partition_parent) ||\n       |                ' ATTACH PARTITION ").append(typeInfo.schema().quoted()).append(".' || quote_ident(partition_name) ||\n       |                ' FOR VALUES FROM (' || quote_literal(partition_start) ||\n       |                ') TO (' || quote_literal(partition_end) || ' );';\n       |              -- once the table is attached we can drop the redundant constraint\n       |              EXECUTE 'ALTER TABLE ").append(typeInfo.schema().quoted()).append(".' || quote_ident(partition_name) ||\n       |                ' DROP CONSTRAINT ' || quote_ident(partition_name || '_constraint');\n       |              RAISE NOTICE 'A partition has been created %', partition_name;\n       |            ELSIF partition_parent = ").append(mainPartitions.name().asLiteral()).append(" THEN\n       |              -- store record of unsorted row counts which could negatively impact BRIN index scans\n       |              GET DIAGNOSTICS unsorted_count := ROW_COUNT;\n       |              INSERT INTO ").append(typeInfo.tables().sortQueue().name().qualified()).append("(partition_name, unsorted_count, enqueued)\n       |                VALUES (partition_name, unsorted_count, now());\n       |              RAISE NOTICE 'Inserting % rows into existing partition %, queries may be impacted',\n       |                unsorted_count, partition_name;\n       |            END IF;\n       |\n       |            -- mark the partition to be analyzed in a separate thread\n       |            INSERT INTO ").append(typeInfo.tables().analyzeQueue().name().qualified()).append("(partition_name, enqueued)\n       |              VALUES (partition_name, now());\n       |          END LOOP;\n       |\n       |          RAISE INFO '% Dropping write ahead table %', timeofday()::timestamp, write_ahead.name;\n       |          EXECUTE 'DROP TABLE ' || quote_ident(write_ahead.name);\n       |\n       |        END IF;\n       |\n       |        COMMIT; -- releases the lock\n       |      END LOOP;\n       |    END;\n       |  $BODY$;\n       |").toString())).stripMargin();
    }

    private PartitionWriteAheadLog$() {
        MODULE$ = this;
        Cpackage.SqlStatements.$init$(this);
        Cpackage.SqlProcedure.$init$((Cpackage.SqlProcedure) this);
    }
}
