package org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.logging;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.phoenix.shaded.com.google.gson.Gson;
import org.apache.phoenix.shaded.com.google.gson.GsonBuilder;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.base.Charsets;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.base.Preconditions;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.base.Stopwatch;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.collect.Iterables;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.collect.Lists;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.util.concurrent.FutureCallback;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.util.concurrent.Futures;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.util.concurrent.ListenableFuture;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.util.concurrent.Service;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.api.logging.LogThrowable;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.common.Threads;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.Services;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.json.ILoggingEventSerializer;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.json.LogThrowableCodec;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.json.StackTraceElementCodec;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKKafkaClientService;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.kafka.client.Compression;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.kafka.client.KafkaClientService;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.kafka.client.KafkaPublisher;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.zookeeper.RetryStrategies;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClientService;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClientServices;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/phoenix/shaded/org/apache/tephra/shaded/org/apache/twill/internal/logging/KafkaAppender.class */
public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaAppender.class);
    private LogEventConverter eventConverter;
    private ZKClientService zkClientService;
    private KafkaClientService kafkaClient;
    private String zkConnectStr;
    private String hostname;
    private String runnableName;
    private String topic;
    private ScheduledExecutorService scheduler;
    private int flushLimit = 20;
    private int flushPeriod = 100;
    private final AtomicReference<KafkaPublisher.Preparer> publisher = new AtomicReference<>();
    private final Runnable flushTask = createFlushTask();
    private final AtomicInteger bufferedSize = new AtomicInteger();
    private Queue<String> buffer = new ConcurrentLinkedQueue();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/phoenix/shaded/org/apache/tephra/shaded/org/apache/twill/internal/logging/KafkaAppender$LogEventConverter.class */
    public static final class LogEventConverter {
        private final Gson gson;

        private LogEventConverter(String str, String str2) {
            this.gson = new GsonBuilder().registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec()).registerTypeAdapter(LogThrowable.class, new LogThrowableCodec()).registerTypeAdapter(ILoggingEvent.class, new ILoggingEventSerializer(str, str2)).create();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String convert(ILoggingEvent iLoggingEvent) {
            return this.gson.toJson(iLoggingEvent, ILoggingEvent.class);
        }
    }

    public void setZookeeper(String str) {
        this.zkConnectStr = str;
    }

    public void setHostname(String str) {
        this.hostname = str;
    }

    public void setRunnableName(String str) {
        this.runnableName = str;
    }

    public void setTopic(String str) {
        this.topic = str;
    }

    public void setFlushLimit(int i) {
        this.flushLimit = i;
    }

    public void setFlushPeriod(int i) {
        this.flushPeriod = i;
    }

    public void start() {
        Preconditions.checkNotNull(this.zkConnectStr);
        this.eventConverter = new LogEventConverter(this.hostname, this.runnableName);
        this.scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("kafka-logger"));
        this.zkClientService = ZKClientServices.delegate(ZKClients.reWatchOnExpire(ZKClients.retryOnFailure(ZKClientService.Builder.of(this.zkConnectStr).build(), RetryStrategies.fixDelay(1L, TimeUnit.SECONDS))));
        this.kafkaClient = new ZKKafkaClientService(this.zkClientService);
        Futures.addCallback(Services.chainStart(this.zkClientService, this.kafkaClient), new FutureCallback<List<ListenableFuture<Service.State>>>() { // from class: org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.logging.KafkaAppender.1
            @Override // org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(List<ListenableFuture<Service.State>> list) {
                Iterator<ListenableFuture<Service.State>> it = list.iterator();
                while (it.hasNext()) {
                    Preconditions.checkState(Futures.getUnchecked(it.next()) == Service.State.RUNNING, "Service is not running.");
                }
                KafkaAppender.LOG.info("Kafka client started: " + KafkaAppender.this.zkConnectStr);
                KafkaAppender.this.scheduler.scheduleWithFixedDelay(KafkaAppender.this.flushTask, 0L, KafkaAppender.this.flushPeriod, TimeUnit.MILLISECONDS);
            }

            @Override // org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                KafkaAppender.LOG.error("Failed to start kafka appender.", th);
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        super.start();
    }

    public void stop() {
        super.stop();
        this.scheduler.shutdownNow();
        Futures.getUnchecked(Services.chainStop(this.kafkaClient, this.zkClientService));
    }

    public void forceFlush() {
        try {
            this.scheduler.submit(this.flushTask).get(2L, TimeUnit.SECONDS);
        } catch (Exception e) {
            LOG.error("Failed to force log flush in 2 seconds.", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void append(ILoggingEvent iLoggingEvent) {
        this.buffer.offer(this.eventConverter.convert(iLoggingEvent));
        if (this.bufferedSize.incrementAndGet() < this.flushLimit || this.publisher.get() == null) {
            return;
        }
        this.scheduler.submit(this.flushTask);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int publishLogs(long j, TimeUnit timeUnit) throws TimeoutException {
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(this.bufferedSize.get());
        Iterator it = Iterables.consumingIterable(this.buffer).iterator();
        while (it.hasNext()) {
            newArrayListWithExpectedSize.add(Charsets.UTF_8.encode((String) it.next()));
        }
        long nanos = timeUnit.toNanos(j) / 10;
        if (nanos <= 0) {
            nanos = 1;
        }
        try {
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.start();
            long j2 = j;
            do {
                try {
                    int intValue = doPublishLogs(newArrayListWithExpectedSize).get(j2, timeUnit).intValue();
                    this.bufferedSize.addAndGet(-intValue);
                    return intValue;
                } catch (ExecutionException e) {
                    LOG.error("Failed to publish logs to Kafka.", e);
                    TimeUnit.NANOSECONDS.sleep(nanos);
                    j2 -= stopwatch.elapsedTime(timeUnit);
                    stopwatch.reset();
                    stopwatch.start();
                }
            } while (j2 > 0);
            return 0;
        } catch (InterruptedException e2) {
            LOG.warn("Logs publish to Kafka interrupted.", e2);
            return 0;
        }
    }

    private ListenableFuture<Integer> doPublishLogs(Collection<ByteBuffer> collection) {
        if (collection.isEmpty()) {
            return Futures.immediateFuture(0);
        }
        KafkaPublisher.Preparer preparer = this.publisher.get();
        if (preparer == null) {
            try {
                this.publisher.compareAndSet(null, this.kafkaClient.getPublisher(KafkaPublisher.Ack.LEADER_RECEIVED, Compression.SNAPPY).prepare(this.topic));
                preparer = this.publisher.get();
            } catch (Exception e) {
                return Futures.immediateFailedFuture(e);
            }
        }
        Iterator<ByteBuffer> it = collection.iterator();
        while (it.hasNext()) {
            preparer.add(it.next(), 0);
        }
        return preparer.send();
    }

    private Runnable createFlushTask() {
        return new Runnable() { // from class: org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.logging.KafkaAppender.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    int publishLogs = KafkaAppender.this.publishLogs(2L, TimeUnit.SECONDS);
                    if (KafkaAppender.LOG.isDebugEnabled()) {
                        KafkaAppender.LOG.debug("Published {} log messages to Kafka.", Integer.valueOf(publishLogs));
                    }
                } catch (Exception e) {
                    KafkaAppender.LOG.error("Failed to push logs to Kafka. Log entries dropped.", e);
                }
            }
        };
    }
}
