package org.apache.flink.tests.util.kafka;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;

/* loaded from: input_file:org/apache/flink/tests/util/kafka/KafkaContainerClient.class */
public class KafkaContainerClient {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaContainerClient.class);
    private final KafkaContainer container;

    public KafkaContainerClient(KafkaContainer kafkaContainer) {
        this.container = kafkaContainer;
    }

    public void createTopic(int i, int i2, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.container.getBootstrapServers());
        try {
            AdminClient create = AdminClient.create(hashMap);
            Throwable th = null;
            try {
                try {
                    create.createTopics(Collections.singletonList(new NewTopic(str, i2, (short) i))).all().get();
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new IllegalStateException(String.format("Fail to create topic [%s partitions: %d replication factor: %d].", str, Integer.valueOf(i2), Integer.valueOf(i)), e);
        }
    }

    public <T> void sendMessages(String str, Serializer<T> serializer, T... tArr) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.container.getBootstrapServers());
        properties.put("acks", "all");
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new BytesSerializer(), serializer);
        Throwable th = null;
        try {
            try {
                for (T t : tArr) {
                    kafkaProducer.send(new ProducerRecord(str, t));
                }
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    public <T> List<T> readMessages(int i, String str, String str2, Deserializer<T> deserializer) throws Exception {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.container.getBootstrapServers());
        properties.put("group.id", str);
        properties.put("enable.auto.commit", "true");
        properties.put("auto.offset.reset", "earliest");
        CopyOnWriteArrayList copyOnWriteArrayList = (List<T>) Collections.synchronizedList(new ArrayList(i));
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, new BytesDeserializer(), deserializer);
        Throwable th = null;
        try {
            try {
                waitUntilTopicAvailableThenAssign(str2, kafkaConsumer, Duration.ofSeconds(60L));
                Deadline fromNow = Deadline.fromNow(Duration.ofSeconds(120L));
                while (fromNow.hasTimeLeft() && copyOnWriteArrayList.size() < i) {
                    LOG.info("Waiting for messages. Received {}/{}.", Integer.valueOf(copyOnWriteArrayList.size()), Integer.valueOf(i));
                    Iterator it = kafkaConsumer.poll(Duration.ofMillis(1000L)).iterator();
                    while (it.hasNext()) {
                        copyOnWriteArrayList.add(((ConsumerRecord) it.next()).value());
                    }
                }
                if (copyOnWriteArrayList.size() != i) {
                    throw new IOException("Could not read expected number of messages.");
                }
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                return copyOnWriteArrayList;
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (th != null) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    private void waitUntilTopicAvailableThenAssign(String str, Consumer<?, ?> consumer, Duration duration) throws Exception {
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(consumer.listTopics(Duration.ofSeconds(1L)).containsKey(str));
        }, duration, String.format("Cannot get information for topic \"%s\" within timeout", str));
        List list = (List) consumer.listTopics().get(str);
        ArrayList arrayList = new ArrayList();
        list.forEach(partitionInfo -> {
            arrayList.add(new TopicPartition(str, partitionInfo.partition()));
        });
        consumer.assign(arrayList);
        consumer.seekToBeginning(arrayList);
    }
}
