package org.apache.zeppelin.flink.sql;

import java.io.IOException;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import org.apache.zeppelin.flink.FlinkShims;
import org.apache.zeppelin.flink.JobManager;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.tabledata.TableDataUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zeppelin/flink/sql/AppendStreamSqlJob.class */
public class AppendStreamSqlJob extends AbstractStreamSqlJob {
    private static Logger LOGGER = LoggerFactory.getLogger(UpdateStreamSqlJob.class);
    private List<Row> materializedTable;
    private long tsWindowThreshold;

    public AppendStreamSqlJob(StreamExecutionEnvironment streamExecutionEnvironment, TableEnvironment tableEnvironment, JobManager jobManager, InterpreterContext interpreterContext, int i, FlinkShims flinkShims) {
        super(streamExecutionEnvironment, tableEnvironment, jobManager, interpreterContext, i, flinkShims);
        this.materializedTable = new ArrayList();
        this.tsWindowThreshold = Long.parseLong((String) interpreterContext.getLocalProperties().getOrDefault("threshold", "3600000"));
    }

    @Override // org.apache.zeppelin.flink.sql.AbstractStreamSqlJob
    protected String getType() {
        return "ts";
    }

    @Override // org.apache.zeppelin.flink.sql.AbstractStreamSqlJob
    protected void checkTableSchema(TableSchema tableSchema) throws Exception {
    }

    @Override // org.apache.zeppelin.flink.sql.AbstractStreamSqlJob
    protected void processInsert(Row row) {
        LOGGER.debug("processInsert: " + row.toString());
        this.materializedTable.add(row);
    }

    @Override // org.apache.zeppelin.flink.sql.AbstractStreamSqlJob
    protected void processDelete(Row row) {
        throw new RuntimeException("Delete operation is not expected");
    }

    @Override // org.apache.zeppelin.flink.sql.AbstractStreamSqlJob
    protected String buildResult() {
        StringBuilder sb = new StringBuilder();
        sb.append("%table\n");
        for (int i = 0; i < this.schema.getFieldCount(); i++) {
            sb.append(this.schema.getFieldNames()[i]);
            if (i != this.schema.getFieldCount() - 1) {
                sb.append("\t");
            }
        }
        sb.append("\n");
        this.materializedTable.sort((row, row2) -> {
            return TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(row.getField(0))).compareTo(TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(row2.getField(0))));
        });
        if (this.materializedTable.size() != 0) {
            if (this.flinkShims.getFlinkVersion().isAfterFlink114()) {
                long time = Timestamp.valueOf((LocalDateTime) this.materializedTable.get(this.materializedTable.size() - 1).getField(0)).getTime();
                this.materializedTable = (List) this.materializedTable.stream().filter(row3 -> {
                    return Timestamp.valueOf((LocalDateTime) row3.getField(0)).getTime() > time - this.tsWindowThreshold;
                }).collect(Collectors.toList());
                sb.append(tableToString(this.materializedTable));
            } else {
                long time2 = ((Timestamp) this.materializedTable.get(this.materializedTable.size() - 1).getField(0)).getTime();
                this.materializedTable = (List) this.materializedTable.stream().filter(row4 -> {
                    return ((Timestamp) row4.getField(0)).getTime() > time2 - this.tsWindowThreshold;
                }).collect(Collectors.toList());
                sb.append(tableToString(this.materializedTable));
            }
        }
        sb.append("\n%text ");
        return sb.toString();
    }

    @Override // org.apache.zeppelin.flink.sql.AbstractStreamSqlJob
    protected void refresh(InterpreterContext interpreterContext) {
        interpreterContext.out().clear(false);
        try {
            String buildResult = buildResult();
            interpreterContext.out.write(buildResult);
            interpreterContext.out.flush();
            LOGGER.debug("Refresh with data: " + buildResult);
        } catch (IOException e) {
            LOGGER.error("Fail to refresh data", e);
        }
    }
}
