/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.mergetree.compact;

import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowUtils;
import org.apache.paimon.utils.Projection;

public class PartialUpdateMergeFunction
implements MergeFunction<KeyValue> {
    public static final String SEQUENCE_GROUP = "sequence-group";
    private final InternalRow.FieldGetter[] getters;
    private final boolean ignoreDelete;
    private final Map<Integer, SequenceGenerator> fieldSequences;
    private InternalRow currentKey;
    private long latestSequenceNumber;
    private boolean isEmpty;
    private GenericRow row;
    private KeyValue reused;

    protected PartialUpdateMergeFunction(InternalRow.FieldGetter[] getters, boolean ignoreDelete, Map<Integer, SequenceGenerator> fieldSequences) {
        this.getters = getters;
        this.ignoreDelete = ignoreDelete;
        this.fieldSequences = fieldSequences;
    }

    @Override
    public void reset() {
        this.currentKey = null;
        this.row = new GenericRow(this.getters.length);
        this.isEmpty = true;
    }

    @Override
    public void add(KeyValue kv) {
        this.currentKey = kv.key();
        if (kv.valueKind().isRetract()) {
            if (this.ignoreDelete) {
                return;
            }
            if (this.fieldSequences.size() > 1) {
                this.retractWithSequenceGroup(kv);
                return;
            }
            String msg = String.join((CharSequence)"By default, Partial update can not accept delete records, you can choose one of the following solutions:", "1. Configure 'partial-update.ignore-delete' to ignore delete records.", "2. Configure 'sequence-group's to retract partial columns.");
            throw new IllegalArgumentException(msg);
        }
        this.latestSequenceNumber = kv.sequenceNumber();
        this.isEmpty = false;
        if (this.fieldSequences.isEmpty()) {
            this.updateNonNullFields(kv);
        } else {
            this.updateWithSequenceGroup(kv);
        }
    }

    private void updateNonNullFields(KeyValue kv) {
        for (int i = 0; i < this.getters.length; ++i) {
            Object field = this.getters[i].getFieldOrNull(kv.value());
            if (field == null) continue;
            this.row.setField(i, field);
        }
    }

    private void updateWithSequenceGroup(KeyValue kv) {
        for (int i = 0; i < this.getters.length; ++i) {
            Long previousSeq;
            Object field = this.getters[i].getFieldOrNull(kv.value());
            SequenceGenerator sequenceGen = this.fieldSequences.get(i);
            if (sequenceGen == null) {
                if (field == null) continue;
                this.row.setField(i, field);
                continue;
            }
            Long currentSeq = sequenceGen.generateNullable(kv.value());
            if (currentSeq == null || (previousSeq = sequenceGen.generateNullable((InternalRow)this.row)) != null && currentSeq < previousSeq) continue;
            this.row.setField(i, field);
        }
    }

    private void retractWithSequenceGroup(KeyValue kv) {
        for (int i = 0; i < this.getters.length; ++i) {
            Long previousSeq;
            Long currentSeq;
            SequenceGenerator sequenceGen = this.fieldSequences.get(i);
            if (sequenceGen == null || (currentSeq = sequenceGen.generateNullable(kv.value())) == null || (previousSeq = sequenceGen.generateNullable((InternalRow)this.row)) != null && currentSeq < previousSeq) continue;
            if (sequenceGen.index() == i) {
                this.row.setField(i, this.getters[i].getFieldOrNull(kv.value()));
                continue;
            }
            this.row.setField(i, null);
        }
    }

    @Override
    @Nullable
    public KeyValue getResult() {
        if (this.isEmpty) {
            return null;
        }
        if (this.reused == null) {
            this.reused = new KeyValue();
        }
        return this.reused.replace(this.currentKey, this.latestSequenceNumber, RowKind.INSERT, (InternalRow)this.row);
    }

    public static MergeFunctionFactory<KeyValue> factory(Options options, RowType rowType) {
        return new Factory(options, rowType);
    }

    private static class Factory
    implements MergeFunctionFactory<KeyValue> {
        private static final long serialVersionUID = 1L;
        private final boolean ignoreDelete;
        private final List<DataType> tableTypes;
        private final Map<Integer, SequenceGenerator> fieldSequences;

        private Factory(Options options, RowType rowType) {
            this.ignoreDelete = (Boolean)options.get(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE);
            this.tableTypes = rowType.getFieldTypes();
            List fieldNames = rowType.getFieldNames();
            this.fieldSequences = new HashMap<Integer, SequenceGenerator>();
            for (Map.Entry entry : options.toMap().entrySet()) {
                String k = (String)entry.getKey();
                String v = (String)entry.getValue();
                if (!k.startsWith("fields") || !k.endsWith(PartialUpdateMergeFunction.SEQUENCE_GROUP)) continue;
                String sequenceFieldName = k.substring("fields".length() + 1, k.length() - PartialUpdateMergeFunction.SEQUENCE_GROUP.length() - 1);
                SequenceGenerator sequenceGen = new SequenceGenerator(sequenceFieldName, rowType);
                Arrays.stream(v.split(",")).map(fieldNames::indexOf).forEach(field -> {
                    if (this.fieldSequences.containsKey(field)) {
                        throw new IllegalArgumentException(String.format("Field %s is defined repeatedly by multiple groups: %s", fieldNames.get((int)field), k));
                    }
                    this.fieldSequences.put((Integer)field, sequenceGen);
                });
                this.fieldSequences.put(sequenceGen.index(), sequenceGen);
            }
        }

        @Override
        public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
            if (projection != null) {
                HashMap<Integer, SequenceGenerator> projectedSequences = new HashMap<Integer, SequenceGenerator>();
                int[] projects = Projection.of((int[][])projection).toTopLevelIndexes();
                HashMap<Integer, Integer> indexMap = new HashMap<Integer, Integer>();
                for (int i = 0; i < projects.length; ++i) {
                    indexMap.put(projects[i], i);
                }
                this.fieldSequences.forEach((field, sequence) -> {
                    int newField = indexMap.getOrDefault(field, -1);
                    if (newField != -1) {
                        int newSequenceId = indexMap.getOrDefault(sequence.index(), -1);
                        if (newSequenceId == -1) {
                            throw new RuntimeException(String.format("Can not find new sequence field for new field. new field index is %s", newField));
                        }
                        projectedSequences.put(newField, new SequenceGenerator(newSequenceId, sequence.fieldType()));
                    }
                });
                return new PartialUpdateMergeFunction(InternalRowUtils.createFieldGetters((List)Projection.of((int[][])projection).project(this.tableTypes)), this.ignoreDelete, projectedSequences);
            }
            return new PartialUpdateMergeFunction(InternalRowUtils.createFieldGetters(this.tableTypes), this.ignoreDelete, this.fieldSequences);
        }

        @Override
        public MergeFunctionFactory.AdjustedProjection adjustProjection(@Nullable int[][] projection) {
            if (this.fieldSequences.isEmpty()) {
                return new MergeFunctionFactory.AdjustedProjection(projection, null);
            }
            if (projection == null) {
                return new MergeFunctionFactory.AdjustedProjection(null, null);
            }
            LinkedHashSet<Integer> extraFields = new LinkedHashSet<Integer>();
            int[] topProjects = Projection.of((int[][])projection).toTopLevelIndexes();
            Set indexSet = Arrays.stream(topProjects).boxed().collect(Collectors.toSet());
            for (int index : topProjects) {
                SequenceGenerator generator = this.fieldSequences.get(index);
                if (generator == null || indexSet.contains(generator.index())) continue;
                extraFields.add(generator.index());
            }
            int[] allProjects = Stream.concat(Arrays.stream(topProjects).boxed(), extraFields.stream()).mapToInt(Integer::intValue).toArray();
            int[][] pushdown = Projection.of((int[])allProjects).toNestedIndexes();
            int[][] outer = Projection.of((int[])IntStream.range(0, topProjects.length).toArray()).toNestedIndexes();
            return new MergeFunctionFactory.AdjustedProjection(pushdown, outer);
        }
    }
}

