/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.mergetree.compact;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ArrayUtils;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.InternalRowUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.UserDefinedSeqComparator;

public class PartialUpdateMergeFunction
implements MergeFunction<KeyValue> {
    public static final String SEQUENCE_GROUP = "sequence-group";
    private final InternalRow.FieldGetter[] getters;
    private final boolean ignoreDelete;
    private final Map<Integer, FieldsComparator> fieldSeqComparators;
    private final boolean fieldSequenceEnabled;
    private final Map<Integer, FieldAggregator> fieldAggregators;
    private final boolean removeRecordOnDelete;
    private final Set<Integer> sequenceGroupPartialDelete;
    private final boolean[] nullables;
    private InternalRow currentKey;
    private long latestSequenceNumber;
    private GenericRow row;
    private KeyValue reused;
    private boolean currentDeleteRow;
    private boolean notNullColumnFilled;
    private boolean meetInsert;

    protected PartialUpdateMergeFunction(InternalRow.FieldGetter[] getters, boolean ignoreDelete, Map<Integer, FieldsComparator> fieldSeqComparators, Map<Integer, FieldAggregator> fieldAggregators, boolean fieldSequenceEnabled, boolean removeRecordOnDelete, Set<Integer> sequenceGroupPartialDelete, boolean[] nullables) {
        this.getters = getters;
        this.ignoreDelete = ignoreDelete;
        this.fieldSeqComparators = fieldSeqComparators;
        this.fieldAggregators = fieldAggregators;
        this.fieldSequenceEnabled = fieldSequenceEnabled;
        this.removeRecordOnDelete = removeRecordOnDelete;
        this.sequenceGroupPartialDelete = sequenceGroupPartialDelete;
        this.nullables = nullables;
    }

    @Override
    public void reset() {
        this.currentKey = null;
        this.meetInsert = false;
        this.notNullColumnFilled = false;
        this.row = new GenericRow(this.getters.length);
        this.latestSequenceNumber = 0L;
        this.fieldAggregators.values().forEach(FieldAggregator::reset);
    }

    @Override
    public void add(KeyValue kv) {
        this.currentKey = kv.key();
        this.currentDeleteRow = false;
        if (kv.valueKind().isRetract()) {
            if (!this.notNullColumnFilled) {
                this.initRow(this.row, kv.value());
                this.notNullColumnFilled = true;
            }
            if (this.ignoreDelete) {
                return;
            }
            this.latestSequenceNumber = kv.sequenceNumber();
            if (this.fieldSequenceEnabled) {
                this.retractWithSequenceGroup(kv);
                return;
            }
            if (this.removeRecordOnDelete) {
                if (kv.valueKind() == RowKind.DELETE) {
                    this.currentDeleteRow = true;
                    this.row = new GenericRow(this.getters.length);
                    this.initRow(this.row, kv.value());
                }
                return;
            }
            String msg = String.join((CharSequence)"\n", "By default, Partial update can not accept delete records, you can choose one of the following solutions:", "1. Configure 'ignore-delete' to ignore delete records.", "2. Configure 'partial-update.remove-record-on-delete' to remove the whole row when receiving delete records.", "3. Configure 'sequence-group's to retract partial columns.");
            throw new IllegalArgumentException(msg);
        }
        this.latestSequenceNumber = kv.sequenceNumber();
        if (this.fieldSeqComparators.isEmpty()) {
            this.updateNonNullFields(kv);
        } else {
            this.updateWithSequenceGroup(kv);
        }
        this.meetInsert = true;
        this.notNullColumnFilled = true;
    }

    private void updateNonNullFields(KeyValue kv) {
        for (int i = 0; i < this.getters.length; ++i) {
            Object field = this.getters[i].getFieldOrNull(kv.value());
            if (field != null) {
                this.row.setField(i, field);
                continue;
            }
            if (this.nullables[i]) continue;
            throw new IllegalArgumentException("Field " + i + " can not be null");
        }
    }

    private void updateWithSequenceGroup(KeyValue kv) {
        for (int i = 0; i < this.getters.length; ++i) {
            Object field = this.getters[i].getFieldOrNull(kv.value());
            FieldsComparator seqComparator = this.fieldSeqComparators.get(i);
            FieldAggregator aggregator = this.fieldAggregators.get(i);
            Object accumulator = this.getters[i].getFieldOrNull(this.row);
            if (seqComparator == null) {
                if (aggregator != null) {
                    this.row.setField(i, aggregator.agg(accumulator, field));
                    continue;
                }
                if (field == null) continue;
                this.row.setField(i, field);
                continue;
            }
            if (this.isEmptySequenceGroup(kv, seqComparator)) continue;
            if (seqComparator.compare(kv.value(), this.row) >= 0) {
                int index = i;
                if (Arrays.stream(seqComparator.compareFields()).anyMatch(seqIndex -> seqIndex == index)) {
                    for (int fieldIndex : seqComparator.compareFields()) {
                        this.row.setField(fieldIndex, this.getters[fieldIndex].getFieldOrNull(kv.value()));
                    }
                    continue;
                }
                this.row.setField(i, aggregator == null ? field : aggregator.agg(accumulator, field));
                continue;
            }
            if (aggregator == null) continue;
            this.row.setField(i, aggregator.aggReversed(accumulator, field));
        }
    }

    private boolean isEmptySequenceGroup(KeyValue kv, FieldsComparator comparator) {
        for (int fieldIndex : comparator.compareFields()) {
            if (this.getters[fieldIndex].getFieldOrNull(kv.value()) == null) continue;
            return false;
        }
        return true;
    }

    private void retractWithSequenceGroup(KeyValue kv) {
        HashSet<Integer> updatedSequenceFields = new HashSet<Integer>();
        for (int i = 0; i < this.getters.length; ++i) {
            FieldsComparator seqComparator = this.fieldSeqComparators.get(i);
            if (seqComparator == null) continue;
            FieldAggregator aggregator = this.fieldAggregators.get(i);
            if (this.isEmptySequenceGroup(kv, seqComparator)) continue;
            if (seqComparator.compare(kv.value(), this.row) >= 0) {
                int index = i;
                if (Arrays.stream(seqComparator.compareFields()).anyMatch(field -> field == index)) {
                    for (int field2 : seqComparator.compareFields()) {
                        if (updatedSequenceFields.contains(field2)) continue;
                        if (kv.valueKind() == RowKind.DELETE && this.sequenceGroupPartialDelete.contains(field2)) {
                            this.currentDeleteRow = true;
                            this.row = new GenericRow(this.getters.length);
                            this.initRow(this.row, kv.value());
                            return;
                        }
                        this.row.setField(field2, this.getters[field2].getFieldOrNull(kv.value()));
                        updatedSequenceFields.add(field2);
                    }
                    continue;
                }
                if (aggregator == null) {
                    this.row.setField(i, null);
                    continue;
                }
                Object accumulator = this.getters[i].getFieldOrNull(this.row);
                this.row.setField(i, aggregator.retract(accumulator, this.getters[i].getFieldOrNull(kv.value())));
                continue;
            }
            if (aggregator == null) continue;
            Object accumulator = this.getters[i].getFieldOrNull(this.row);
            this.row.setField(i, aggregator.retract(accumulator, this.getters[i].getFieldOrNull(kv.value())));
        }
    }

    private void initRow(GenericRow row, InternalRow value) {
        for (int i = 0; i < this.getters.length; ++i) {
            Object field = this.getters[i].getFieldOrNull(value);
            if (this.nullables[i]) continue;
            if (field != null) {
                row.setField(i, field);
                continue;
            }
            throw new IllegalArgumentException("Field " + i + " can not be null");
        }
    }

    @Override
    public KeyValue getResult() {
        if (this.reused == null) {
            this.reused = new KeyValue();
        }
        RowKind rowKind = this.currentDeleteRow || !this.meetInsert ? RowKind.DELETE : RowKind.INSERT;
        return this.reused.replace(this.currentKey, this.latestSequenceNumber, rowKind, this.row);
    }

    @Override
    public boolean requireCopy() {
        return false;
    }

    public static MergeFunctionFactory<KeyValue> factory(Options options, RowType rowType, List<String> primaryKeys) {
        return new Factory(options, rowType, primaryKeys);
    }

    private static class Factory
    implements MergeFunctionFactory<KeyValue> {
        private static final long serialVersionUID = 1L;
        private final boolean ignoreDelete;
        private final RowType rowType;
        private final List<DataType> tableTypes;
        private final Map<Integer, Supplier<FieldsComparator>> fieldSeqComparators;
        private final Map<Integer, Supplier<FieldAggregator>> fieldAggregators;
        private final boolean removeRecordOnDelete;
        private final String removeRecordOnSequenceGroup;
        private Set<Integer> sequenceGroupPartialDelete;

        private Factory(Options options, RowType rowType, List<String> primaryKeys) {
            this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
            this.rowType = rowType;
            this.tableTypes = rowType.getFieldTypes();
            this.removeRecordOnSequenceGroup = options.get(CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP);
            this.sequenceGroupPartialDelete = new HashSet<Integer>();
            List<String> fieldNames = rowType.getFieldNames();
            this.fieldSeqComparators = new HashMap<Integer, Supplier<FieldsComparator>>();
            HashMap sequenceGroupMap = new HashMap();
            ArrayList<String> allSequenceFields = new ArrayList<String>();
            for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
                String k = entry.getKey();
                String v = entry.getValue();
                if (!k.startsWith("fields") || !k.endsWith(PartialUpdateMergeFunction.SEQUENCE_GROUP)) continue;
                List<String> sequenceFields = Arrays.stream(k.substring("fields".length() + 1, k.length() - PartialUpdateMergeFunction.SEQUENCE_GROUP.length() - 1).split(",")).map(fieldName -> this.validateFieldName((String)fieldName, fieldNames)).collect(Collectors.toList());
                allSequenceFields.addAll(sequenceFields);
                Supplier<FieldsComparator> userDefinedSeqComparator = () -> UserDefinedSeqComparator.create(rowType, sequenceFields, true);
                Arrays.stream(v.split(",")).map(fieldName -> fieldNames.indexOf(this.validateFieldName((String)fieldName, fieldNames))).forEach(field -> {
                    if (this.fieldSeqComparators.containsKey(field)) {
                        throw new IllegalArgumentException(String.format("Field %s is defined repeatedly by multiple groups: %s", fieldNames.get((int)field), k));
                    }
                    this.fieldSeqComparators.put((Integer)field, userDefinedSeqComparator);
                });
                sequenceFields.forEach(fieldName -> {
                    int index = fieldNames.indexOf(fieldName);
                    this.fieldSeqComparators.put(index, userDefinedSeqComparator);
                    sequenceGroupMap.put(fieldName, index);
                });
            }
            this.fieldAggregators = this.createFieldAggregators(rowType, primaryKeys, allSequenceFields, new CoreOptions(options));
            this.removeRecordOnDelete = options.get(CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);
            Preconditions.checkState(!this.removeRecordOnDelete || !this.ignoreDelete, String.format("%s and %s have conflicting behavior so should not be enabled at the same time.", CoreOptions.IGNORE_DELETE, CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
            Preconditions.checkState(!this.removeRecordOnDelete || this.fieldSeqComparators.isEmpty(), String.format("sequence group and %s have conflicting behavior so should not be enabled at the same time.", CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
            if (this.removeRecordOnSequenceGroup != null) {
                String[] sequenceGroupArr = this.removeRecordOnSequenceGroup.split(",");
                Preconditions.checkState(sequenceGroupMap.keySet().containsAll(Arrays.asList(sequenceGroupArr)), String.format("field '%s' defined in '%s' option must be part of sequence groups", this.removeRecordOnSequenceGroup, CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP.key()));
                this.sequenceGroupPartialDelete = Arrays.stream(sequenceGroupArr).filter(sequenceGroupMap::containsKey).map(sequenceGroupMap::get).collect(Collectors.toSet());
            }
        }

        @Override
        public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
            if (projection != null) {
                HashMap<Integer, FieldsComparator> projectedSeqComparators = new HashMap<Integer, FieldsComparator>();
                HashMap<Integer, FieldAggregator> projectedAggregators = new HashMap<Integer, FieldAggregator>();
                int[] projects = Projection.of(projection).toTopLevelIndexes();
                HashMap<Integer, Integer> indexMap = new HashMap<Integer, Integer>();
                List<DataField> dataFields = this.rowType.getFields();
                ArrayList<DataType> newDataTypes = new ArrayList<DataType>();
                for (int i = 0; i < projects.length; ++i) {
                    indexMap.put(projects[i], i);
                    newDataTypes.add(dataFields.get(projects[i]).type());
                }
                RowType newRowType = RowType.builder().fields(newDataTypes).build();
                this.fieldSeqComparators.forEach((field, comparatorSupplier) -> {
                    FieldsComparator comparator = (FieldsComparator)comparatorSupplier.get();
                    int newField = indexMap.getOrDefault(field, -1);
                    if (newField != -1) {
                        int[] newSequenceFields = Arrays.stream(comparator.compareFields()).map(index -> {
                            int newIndex = indexMap.getOrDefault(index, -1);
                            if (newIndex == -1) {
                                throw new RuntimeException(String.format("Can not find new sequence field for new field. new field index is %s", newField));
                            }
                            return newIndex;
                        }).toArray();
                        projectedSeqComparators.put(newField, UserDefinedSeqComparator.create(newRowType, newSequenceFields, true));
                    }
                });
                for (int i = 0; i < projects.length; ++i) {
                    if (!this.fieldAggregators.containsKey(projects[i])) continue;
                    projectedAggregators.put(i, this.fieldAggregators.get(projects[i]).get());
                }
                List<DataType> projectedTypes = Projection.of(projection).project(this.tableTypes);
                return new PartialUpdateMergeFunction(InternalRowUtils.createFieldGetters(projectedTypes), this.ignoreDelete, projectedSeqComparators, projectedAggregators, !this.fieldSeqComparators.isEmpty(), this.removeRecordOnDelete, this.sequenceGroupPartialDelete, ArrayUtils.toPrimitiveBoolean(projectedTypes.stream().map(DataType::isNullable).toArray(Boolean[]::new)));
            }
            HashMap<Integer, FieldsComparator> fieldSeqComparators = new HashMap<Integer, FieldsComparator>();
            this.fieldSeqComparators.forEach((f, supplier) -> {
                FieldsComparator cfr_ignored_0 = (FieldsComparator)fieldSeqComparators.put((Integer)f, (FieldsComparator)supplier.get());
            });
            HashMap<Integer, FieldAggregator> fieldAggregators = new HashMap<Integer, FieldAggregator>();
            this.fieldAggregators.forEach((f, supplier) -> {
                FieldAggregator cfr_ignored_0 = (FieldAggregator)fieldAggregators.put((Integer)f, (FieldAggregator)supplier.get());
            });
            return new PartialUpdateMergeFunction(InternalRowUtils.createFieldGetters(this.tableTypes), this.ignoreDelete, fieldSeqComparators, fieldAggregators, !fieldSeqComparators.isEmpty(), this.removeRecordOnDelete, this.sequenceGroupPartialDelete, ArrayUtils.toPrimitiveBoolean(this.rowType.getFieldTypes().stream().map(DataType::isNullable).toArray(Boolean[]::new)));
        }

        @Override
        public MergeFunctionFactory.AdjustedProjection adjustProjection(@Nullable int[][] projection) {
            if (this.fieldSeqComparators.isEmpty()) {
                return new MergeFunctionFactory.AdjustedProjection(projection, null);
            }
            if (projection == null) {
                return new MergeFunctionFactory.AdjustedProjection(null, null);
            }
            LinkedHashSet<Integer> extraFields = new LinkedHashSet<Integer>();
            int[] topProjects = Projection.of(projection).toTopLevelIndexes();
            Set indexSet = Arrays.stream(topProjects).boxed().collect(Collectors.toSet());
            for (int index : topProjects) {
                Supplier<FieldsComparator> comparatorSupplier = this.fieldSeqComparators.get(index);
                if (comparatorSupplier == null) continue;
                FieldsComparator comparator = comparatorSupplier.get();
                for (int field : comparator.compareFields()) {
                    if (indexSet.contains(field)) continue;
                    extraFields.add(field);
                }
            }
            int[] allProjects = Stream.concat(Arrays.stream(topProjects).boxed(), extraFields.stream()).mapToInt(Integer::intValue).toArray();
            int[][] pushDown = Projection.of(allProjects).toNestedIndexes();
            int[][] outer = Projection.of(IntStream.range(0, topProjects.length).toArray()).toNestedIndexes();
            return new MergeFunctionFactory.AdjustedProjection(pushDown, outer);
        }

        private String validateFieldName(String fieldName, List<String> fieldNames) {
            int field = fieldNames.indexOf(fieldName);
            if (field == -1) {
                throw new IllegalArgumentException(String.format("Field %s can not be found in table schema", fieldName));
            }
            return fieldName;
        }

        private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators(RowType rowType, List<String> primaryKeys, List<String> allSequenceFields, CoreOptions options) {
            List<String> fieldNames = rowType.getFieldNames();
            List<DataType> fieldTypes = rowType.getFieldTypes();
            HashMap<Integer, Supplier<FieldAggregator>> fieldAggregators = new HashMap<Integer, Supplier<FieldAggregator>>();
            for (int i = 0; i < fieldNames.size(); ++i) {
                String fieldName = fieldNames.get(i);
                DataType fieldType = fieldTypes.get(i);
                if (allSequenceFields.contains(fieldName)) continue;
                if (primaryKeys.contains(fieldName)) {
                    fieldAggregators.put(i, () -> FieldAggregatorFactory.create(fieldType, fieldName, "primary-key", options));
                    continue;
                }
                String aggFuncName = this.getAggFuncName(options, fieldName);
                if (aggFuncName == null) continue;
                Preconditions.checkArgument(aggFuncName.equals("last_non_null_value") || this.fieldSeqComparators.containsKey(fieldNames.indexOf(fieldName)), "Must use sequence group for aggregation functions but not found for field %s.", fieldName);
                fieldAggregators.put(i, () -> FieldAggregatorFactory.create(fieldType, fieldName, aggFuncName, options));
            }
            return fieldAggregators;
        }

        @Nullable
        private String getAggFuncName(CoreOptions options, String fieldName) {
            String aggFunc = options.fieldAggFunc(fieldName);
            return aggFunc == null ? options.fieldsDefaultFunc() : aggFunc;
        }
    }
}

