/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zeppelin.ksql;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.ksql.KSQLResponse;
import org.apache.zeppelin.ksql.KSQLRestService;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KSQLInterpreter
extends Interpreter {
    private static final String NEW_LINE = "\n";
    private static final Logger LOGGER = LoggerFactory.getLogger(KSQLInterpreter.class);
    public static final String TABLE_DELIMITER = "\t";
    private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
    private final KSQLRestService ksqlRestService;
    private static final ObjectMapper json = new ObjectMapper();

    public KSQLInterpreter(Properties properties) {
        this(properties, new KSQLRestService(properties.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue() != null ? e.getValue().toString() : null))));
    }

    public KSQLInterpreter(Properties properties, KSQLRestService ksqlRestService) {
        super(properties);
        this.ksqlRestService = ksqlRestService;
    }

    public void open() throws InterpreterException {
    }

    public void close() throws InterpreterException {
        this.ksqlRestService.close();
    }

    private String writeValueAsString(Object data) {
        try {
            if (data instanceof Collection || data instanceof Map) {
                return json.writeValueAsString(data);
            }
            if (data instanceof String) {
                return (String)data;
            }
            return String.valueOf(data);
        }
        catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    private void checkResponseErrors(String message) throws IOException {
        if (StringUtils.isNotBlank((CharSequence)message)) {
            this.interpreterOutput.getInterpreterOutput().write("%text");
            this.interpreterOutput.getInterpreterOutput().write(NEW_LINE);
            this.interpreterOutput.getInterpreterOutput().write(message);
        }
    }

    public InterpreterResult interpret(String query, InterpreterContext context) throws InterpreterException {
        if (StringUtils.isBlank((CharSequence)query)) {
            return new InterpreterResult(InterpreterResult.Code.SUCCESS);
        }
        this.interpreterOutput.setInterpreterOutput(context.out);
        try {
            this.interpreterOutput.getInterpreterOutput().flush();
            this.interpreterOutput.getInterpreterOutput().write("%table");
            this.interpreterOutput.getInterpreterOutput().write(NEW_LINE);
            LinkedHashSet<String> header = new LinkedHashSet<String>();
            this.executeQuery(context.getParagraphId(), query.trim(), header);
            return new InterpreterResult(InterpreterResult.Code.SUCCESS);
        }
        catch (IOException e) {
            return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
        }
    }

    private void executeQuery(String paragraphId, String query, Set<String> header) throws IOException {
        AtomicBoolean isFirstLine = new AtomicBoolean(true);
        this.ksqlRestService.executeQuery(paragraphId, query, (KSQLResponse resp) -> {
            try {
                if (resp.getRow() == null || resp.getRow().isEmpty()) {
                    return;
                }
                if (isFirstLine.get()) {
                    isFirstLine.set(false);
                    header.addAll(resp.getRow().keySet());
                    this.interpreterOutput.getInterpreterOutput().write(header.stream().collect(Collectors.joining(TABLE_DELIMITER)));
                    this.interpreterOutput.getInterpreterOutput().write(NEW_LINE);
                }
                this.interpreterOutput.getInterpreterOutput().write(resp.getRow().values().stream().map(this::writeValueAsString).collect(Collectors.joining(TABLE_DELIMITER)));
                this.interpreterOutput.getInterpreterOutput().write(NEW_LINE);
                this.checkResponseErrors(resp.getFinalMessage());
                this.checkResponseErrors(resp.getErrorMessage());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void cancel(InterpreterContext context) throws InterpreterException {
        LOGGER.info("Trying to cancel paragraphId {}", (Object)context.getParagraphId());
        try {
            this.ksqlRestService.closeClient(context.getParagraphId());
            LOGGER.info("Removed");
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public Interpreter.FormType getFormType() throws InterpreterException {
        return Interpreter.FormType.SIMPLE;
    }

    public int getProgress(InterpreterContext context) throws InterpreterException {
        return 0;
    }

    public Scheduler getScheduler() {
        return SchedulerFactory.singleton().createOrGetFIFOScheduler(KSQLInterpreter.class.getName() + ((Object)((Object)this)).hashCode());
    }
}

