/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ambari.metrics.core.timeline.sink;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
import org.apache.ambari.metrics.core.timeline.sink.ExternalMetricsSink;
import org.apache.ambari.metrics.core.timeline.sink.ExternalSinkProvider;
import org.apache.ambari.metrics.core.timeline.source.InternalSourceProvider;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaSinkProvider
implements ExternalSinkProvider {
    private static String TOPIC_NAME = "obdp-metrics-topic";
    private static final Log LOG = LogFactory.getLog(KafkaSinkProvider.class);
    private Producer producer;
    private int TIMEOUT_SECONDS = 10;
    private int FLUSH_SECONDS = 3;
    ObjectMapper objectMapper = new ObjectMapper();

    public KafkaSinkProvider() {
        TimelineMetricConfiguration configuration = TimelineMetricConfiguration.getInstance();
        Properties configProperties = new Properties();
        try {
            configProperties.put("bootstrap.servers", configuration.getMetricsConf().getTrimmed("timeline.metrics.external.sink.kafka.bootstrap.servers"));
            configProperties.put("acks", configuration.getMetricsConf().getTrimmed("timeline.metrics.external.sink.kafka.acks", "all"));
            configProperties.put("retries", (Object)configuration.getMetricsConf().getInt("timeline.metrics.external.sink.kafka.bootstrap.retries", 0));
            configProperties.put("batch.size", (Object)configuration.getMetricsConf().getInt("timeline.metrics.external.sink.kafka.batch.size", 128));
            configProperties.put("linger.ms", (Object)configuration.getMetricsConf().getInt("timeline.metrics.external.sink.kafka.linger.ms", 1));
            configProperties.put("buffer.memory", (Object)configuration.getMetricsConf().getLong("timeline.metrics.external.sink.kafka.buffer.memory", 0x2000000L));
            this.FLUSH_SECONDS = configuration.getMetricsConf().getInt("timeline.metrics.cache.commit.interval", 3);
            this.TIMEOUT_SECONDS = configuration.getMetricsConf().getInt("timeline.metrics.external.sink.kafka.timeout.seconds", 10);
        }
        catch (Exception e) {
            LOG.error((Object)"Configuration error!", (Throwable)e);
            throw new ExceptionInInitializerError(e);
        }
        configProperties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        configProperties.put("value.serializer", "org.apache.kafka.connect.json.JsonSerializer");
        this.producer = new KafkaProducer(configProperties);
    }

    @Override
    public ExternalMetricsSink getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) {
        switch (sourceName) {
            case RAW_METRICS: {
                return new KafkaRawMetricsSink();
            }
        }
        throw new UnsupportedOperationException("Provider does not support the expected source " + sourceName);
    }

    class KafkaRawMetricsSink
    implements ExternalMetricsSink {
        KafkaRawMetricsSink() {
        }

        @Override
        public int getSinkTimeOutSeconds() {
            return KafkaSinkProvider.this.TIMEOUT_SECONDS;
        }

        @Override
        public int getFlushSeconds() {
            return KafkaSinkProvider.this.FLUSH_SECONDS;
        }

        @Override
        public void sinkMetricData(Collection<TimelineMetrics> metrics) {
            JsonNode jsonNode = KafkaSinkProvider.this.objectMapper.valueToTree(metrics);
            ProducerRecord rec = new ProducerRecord(TOPIC_NAME, (Object)jsonNode);
            Future f = KafkaSinkProvider.this.producer.send(rec);
        }
    }
}

