package org.apache.atlas.kafka.bridge;

import com.google.common.annotations.VisibleForTesting;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.kafka.model.KafkaDataTypes;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.utils.AtlasConfigurationUtil;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.atlas.utils.KafkaUtils;
import org.apache.avro.Schema;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/atlas/kafka/bridge/KafkaBridge.class */
public class KafkaBridge {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBridge.class);
    private static final String KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE = System.getenv("KAFKA_SCHEMA_REGISTRY");
    public static String KAFKA_SCHEMA_REGISTRY_HOSTNAME = "localhost";
    private static final int EXIT_CODE_SUCCESS = 0;
    private static final int EXIT_CODE_FAILED = 1;
    private static final String ATLAS_ENDPOINT = "atlas.rest.address";
    private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
    private static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
    private static final String KAFKA_METADATA_NAMESPACE = "atlas.metadata.namespace";
    private static final String DEFAULT_CLUSTER_NAME = "primary";
    private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
    private static final String DESCRIPTION_ATTR = "description";
    private static final String PARTITION_COUNT = "partitionCount";
    private static final String REPLICATION_FACTOR = "replicationFactor";
    private static final String NAME = "name";
    private static final String URI = "uri";
    private static final String CLUSTERNAME = "clusterName";
    private static final String TOPIC = "topic";
    private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s";
    private static final String TYPE = "type";
    private static final String NAMESPACE = "namespace";
    private static final String FIELDS = "fields";
    private static final String AVRO_SCHEMA = "avroSchema";
    private static final String SCHEMA_VERSION_ID = "versionId";
    private static final String FORMAT_KAKFA_SCHEMA_QUALIFIED_NAME = "%s@%s@%s";
    private static final String FORMAT_KAKFA_FIELD_QUALIFIED_NAME = "%s@%s@%s@%s";
    private final List<String> availableTopics;
    private final String metadataNamespace;
    private final AtlasClientV2 atlasClientV2;
    private final KafkaUtils kafkaUtils;
    private final CloseableHttpClient httpClient = HttpClientBuilder.create().build();

    public static void main(String[] strArr) {
        AtlasClientV2 atlasClientV2;
        int i = EXIT_CODE_FAILED;
        AtlasClientV2 atlasClientV22 = null;
        KafkaUtils kafkaUtils = null;
        CloseableHttpClient closeableHttpClient = null;
        System.out.print("\n################################\n");
        System.out.print("# Custom Kafka bridge #\n");
        System.out.print("################################\n\n");
        try {
            try {
                Options options = new Options();
                options.addOption("t", TOPIC, true, TOPIC);
                options.addOption("f", "filename", true, "filename");
                CommandLine parse = new BasicParser().parse(options, strArr);
                String optionValue = parse.getOptionValue("t");
                String optionValue2 = parse.getOptionValue("f");
                Configuration configuration = ApplicationProperties.get();
                String[] stringArray = configuration.getStringArray(ATLAS_ENDPOINT);
                if (stringArray == null || stringArray.length == 0) {
                    stringArray = new String[]{DEFAULT_ATLAS_URL};
                }
                if (AuthenticationUtil.isKerberosAuthenticationEnabled()) {
                    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
                    atlasClientV2 = new AtlasClientV2(currentUser, currentUser.getShortUserName(), stringArray);
                } else {
                    atlasClientV2 = new AtlasClientV2(stringArray, AuthenticationUtil.getBasicAuthenticationInput());
                }
                KafkaUtils kafkaUtils2 = new KafkaUtils(configuration);
                KafkaBridge kafkaBridge = new KafkaBridge(configuration, atlasClientV2, kafkaUtils2);
                if (StringUtils.isNotEmpty(KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE)) {
                    KAFKA_SCHEMA_REGISTRY_HOSTNAME = KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE;
                }
                if (StringUtils.isNotEmpty(optionValue2)) {
                    File file = new File(optionValue2);
                    if (file.exists() && file.canRead()) {
                        BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
                        while (true) {
                            String readLine = bufferedReader.readLine();
                            if (readLine == null) {
                                break;
                            } else {
                                kafkaBridge.importTopic(readLine.trim());
                            }
                        }
                        i = EXIT_CODE_SUCCESS;
                    } else {
                        LOG.error("Failed to read the file");
                    }
                } else {
                    kafkaBridge.importTopic(optionValue);
                    i = EXIT_CODE_SUCCESS;
                }
                if (atlasClientV2 != null) {
                    atlasClientV2.close();
                }
                if (kafkaUtils2 != null) {
                    kafkaUtils2.close();
                }
                if (EXIT_CODE_SUCCESS != 0) {
                    try {
                        closeableHttpClient.close();
                    } catch (IOException e) {
                        LOG.error("Could not close http client: ", e);
                    }
                }
            } catch (Throwable th) {
                if (EXIT_CODE_SUCCESS != 0) {
                    atlasClientV22.close();
                }
                if (EXIT_CODE_SUCCESS != 0) {
                    kafkaUtils.close();
                }
                if (EXIT_CODE_SUCCESS != 0) {
                    try {
                        closeableHttpClient.close();
                    } catch (IOException e2) {
                        LOG.error("Could not close http client: ", e2);
                    }
                }
                throw th;
            }
        } catch (ParseException e3) {
            LOG.error("Failed to parse arguments. Error: ", e3.getMessage());
            printUsage();
            if (EXIT_CODE_SUCCESS != 0) {
                atlasClientV22.close();
            }
            if (EXIT_CODE_SUCCESS != 0) {
                kafkaUtils.close();
            }
            if (EXIT_CODE_SUCCESS != 0) {
                try {
                    closeableHttpClient.close();
                } catch (IOException e4) {
                    LOG.error("Could not close http client: ", e4);
                }
            }
        } catch (Exception e5) {
            System.out.println("ImportKafkaEntities failed. Please check the log file for the detailed error message");
            e5.printStackTrace();
            LOG.error("ImportKafkaEntities failed", e5);
            if (EXIT_CODE_SUCCESS != 0) {
                atlasClientV22.close();
            }
            if (EXIT_CODE_SUCCESS != 0) {
                kafkaUtils.close();
            }
            if (EXIT_CODE_SUCCESS != 0) {
                try {
                    closeableHttpClient.close();
                } catch (IOException e6) {
                    LOG.error("Could not close http client: ", e6);
                }
            }
        }
        System.out.print("\n\n");
        System.exit(i);
    }

    public KafkaBridge(Configuration configuration, AtlasClientV2 atlasClientV2, KafkaUtils kafkaUtils) throws Exception {
        this.atlasClientV2 = atlasClientV2;
        this.metadataNamespace = getMetadataNamespace(configuration);
        this.kafkaUtils = kafkaUtils;
        this.availableTopics = this.kafkaUtils.listAllTopics();
    }

    private String getMetadataNamespace(Configuration configuration) {
        return AtlasConfigurationUtil.getRecentString(configuration, KAFKA_METADATA_NAMESPACE, getClusterName(configuration));
    }

    private String getClusterName(Configuration configuration) {
        return configuration.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
    }

    public void importTopic(String str) throws Exception {
        List<String> list = this.availableTopics;
        if (StringUtils.isNotEmpty(str)) {
            ArrayList arrayList = new ArrayList();
            for (String str2 : list) {
                if (Pattern.compile(str).matcher(str2).matches()) {
                    arrayList.add(str2);
                }
            }
            list = arrayList;
        }
        if (CollectionUtils.isNotEmpty(list)) {
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                createOrUpdateTopic(it.next());
            }
        }
    }

    @VisibleForTesting
    AtlasEntity.AtlasEntityWithExtInfo createOrUpdateTopic(String str) throws Exception {
        AtlasEntity.AtlasEntityWithExtInfo updateEntityInAtlas;
        String topicQualifiedName = getTopicQualifiedName(this.metadataNamespace, str);
        AtlasEntity.AtlasEntityWithExtInfo findEntityInAtlas = findEntityInAtlas(KafkaDataTypes.KAFKA_TOPIC.getName(), topicQualifiedName);
        System.out.print("\n");
        if (findEntityInAtlas == null) {
            System.out.println("Adding Kafka topic " + str);
            LOG.info("Importing Kafka topic: {}", topicQualifiedName);
            updateEntityInAtlas = createEntityInAtlas(new AtlasEntity.AtlasEntityWithExtInfo(getTopicEntity(str, null)));
        } else {
            System.out.println("Updating Kafka topic " + str);
            LOG.info("Kafka topic {} already exists in Atlas. Updating it..", topicQualifiedName);
            findEntityInAtlas.setEntity(getTopicEntity(str, findEntityInAtlas.getEntity()));
            updateEntityInAtlas = updateEntityInAtlas(findEntityInAtlas);
        }
        return updateEntityInAtlas;
    }

    @VisibleForTesting
    AtlasEntity.AtlasEntityWithExtInfo createOrUpdateSchema(String str, String str2, String str3, int i) throws Exception {
        AtlasEntity.AtlasEntityWithExtInfo updateEntityInAtlas;
        String schemaQualifiedName = getSchemaQualifiedName(this.metadataNamespace, str2 + "-value", "v" + i);
        AtlasEntity.AtlasEntityWithExtInfo findEntityInAtlas = findEntityInAtlas(KafkaDataTypes.AVRO_SCHEMA.getName(), schemaQualifiedName);
        if (findEntityInAtlas == null) {
            System.out.println("---Adding Kafka schema " + str);
            LOG.info("Importing Kafka schema: {}", schemaQualifiedName);
            updateEntityInAtlas = createEntityInAtlas(new AtlasEntity.AtlasEntityWithExtInfo(getSchemaEntity(str, str2, str3, i, null)));
        } else {
            System.out.println("---Updating Kafka schema " + str);
            LOG.info("Kafka schema {} already exists in Atlas. Updating it..", schemaQualifiedName);
            findEntityInAtlas.setEntity(getSchemaEntity(str, str2, str3, i, findEntityInAtlas.getEntity()));
            updateEntityInAtlas = updateEntityInAtlas(findEntityInAtlas);
        }
        return updateEntityInAtlas;
    }

    @VisibleForTesting
    AtlasEntity.AtlasEntityWithExtInfo createOrUpdateField(Schema.Field field, String str, String str2, int i, String str3) throws Exception {
        AtlasEntity.AtlasEntityWithExtInfo updateEntityInAtlas;
        String concatFullname = concatFullname(field.name(), str3, "");
        String fieldQualifiedName = getFieldQualifiedName(this.metadataNamespace, concatFullname, str + "-value", "v" + i);
        AtlasEntity.AtlasEntityWithExtInfo findEntityInAtlas = findEntityInAtlas(KafkaDataTypes.AVRO_FIELD.getName(), fieldQualifiedName);
        if (findEntityInAtlas == null) {
            System.out.println("---Adding Avro field " + concatFullname);
            LOG.info("Importing Avro field: {}", fieldQualifiedName);
            updateEntityInAtlas = createEntityInAtlas(new AtlasEntity.AtlasEntityWithExtInfo(getFieldEntity(field, str, str2, i, null, concatFullname)));
        } else {
            System.out.println("---Updating Avro field " + concatFullname);
            LOG.info("Avro field {} already exists in Atlas. Updating it..", fieldQualifiedName);
            findEntityInAtlas.setEntity(getFieldEntity(field, str, str2, i, findEntityInAtlas.getEntity(), concatFullname));
            updateEntityInAtlas = updateEntityInAtlas(findEntityInAtlas);
        }
        return updateEntityInAtlas;
    }

    @VisibleForTesting
    AtlasEntity getTopicEntity(String str, AtlasEntity atlasEntity) throws Exception {
        AtlasEntity atlasEntity2 = atlasEntity == null ? new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName()) : atlasEntity;
        atlasEntity2.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getTopicQualifiedName(this.metadataNamespace, str));
        atlasEntity2.setAttribute(CLUSTERNAME, this.metadataNamespace);
        atlasEntity2.setAttribute(TOPIC, str);
        atlasEntity2.setAttribute(NAME, str);
        atlasEntity2.setAttribute(DESCRIPTION_ATTR, str);
        atlasEntity2.setAttribute(URI, str);
        try {
            atlasEntity2.setAttribute(PARTITION_COUNT, this.kafkaUtils.getPartitionCount(str));
            atlasEntity2.setAttribute(REPLICATION_FACTOR, this.kafkaUtils.getReplicationFactor(str));
            List<AtlasEntity> findOrCreateAtlasSchema = findOrCreateAtlasSchema(str);
            if (findOrCreateAtlasSchema.size() > 0) {
                atlasEntity2.setAttribute(AVRO_SCHEMA, findOrCreateAtlasSchema);
                atlasEntity2.setRelationshipAttribute(AVRO_SCHEMA, findOrCreateAtlasSchema);
            }
            return atlasEntity2;
        } catch (InterruptedException | ExecutionException e) {
            LOG.error("Error while getting partition data for topic :" + str, e);
            throw new Exception("Error while getting partition data for topic :" + str, e);
        }
    }

    @VisibleForTesting
    AtlasEntity getSchemaEntity(String str, String str2, String str3, int i, AtlasEntity atlasEntity) throws Exception {
        new ArrayList();
        AtlasEntity atlasEntity2 = atlasEntity == null ? new AtlasEntity(KafkaDataTypes.AVRO_SCHEMA.getName()) : atlasEntity;
        Schema parse = new Schema.Parser().parse(str);
        String schemaQualifiedName = getSchemaQualifiedName(this.metadataNamespace, str2 + "-value", "v" + i);
        if (str3 == null) {
            str3 = parse.getNamespace() != null ? parse.getNamespace() : KAFKA_METADATA_NAMESPACE;
        }
        atlasEntity2.setAttribute(ATTRIBUTE_QUALIFIED_NAME, schemaQualifiedName);
        atlasEntity2.setAttribute(TYPE, parse.getType());
        atlasEntity2.setAttribute(NAMESPACE, str3);
        atlasEntity2.setAttribute(NAME, parse.getName() + "(v" + i + ")");
        atlasEntity2.setAttribute(SCHEMA_VERSION_ID, Integer.valueOf(i));
        List<AtlasEntity> createNestedFields = createNestedFields(parse, str2, str3, i, "");
        if (createNestedFields.size() > 0) {
            atlasEntity2.setRelationshipAttribute(FIELDS, createNestedFields);
        }
        return atlasEntity2;
    }

    List<AtlasEntity> createNestedFields(Schema schema, String str, String str2, int i, String str3) throws Exception {
        ArrayList arrayList = new ArrayList();
        JSONParser jSONParser = new JSONParser();
        for (Schema.Field field : schema.getFields()) {
            if (field.schema().getType() == Schema.Type.ARRAY) {
                System.out.println("ARRAY DETECTED");
                Schema parse = new Schema.Parser().parse(((JSONObject) jSONParser.parse(field.schema().toString())).get("items").toString());
                str3 = concatFullname(field.name(), str3, parse.getName());
                arrayList.addAll(createNestedFields(parse, str, str2, i, str3));
            } else if (field.schema().getType() != Schema.Type.RECORD || str.equals(field.name())) {
                arrayList.add(createOrUpdateField(field, str, str2, i, str3).getEntity());
            } else {
                System.out.println("NESTED RECORD DETECTED");
                str3 = concatFullname(field.name(), str3, "");
                arrayList.addAll(createNestedFields(field.schema(), str, str2, i, str3));
            }
        }
        arrayList.sort((atlasEntity, atlasEntity2) -> {
            return (atlasEntity.getAttribute(NAME) == null || atlasEntity2.getAttribute(NAME) == null) ? EXIT_CODE_SUCCESS : atlasEntity.getAttribute(NAME).toString().compareTo(atlasEntity2.getAttribute(NAME).toString());
        });
        return arrayList;
    }

    @VisibleForTesting
    AtlasEntity getFieldEntity(Schema.Field field, String str, String str2, int i, AtlasEntity atlasEntity, String str3) throws Exception {
        AtlasEntity atlasEntity2 = atlasEntity == null ? new AtlasEntity(KafkaDataTypes.AVRO_FIELD.getName()) : atlasEntity;
        atlasEntity2.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getFieldQualifiedName(this.metadataNamespace, str3, str + "-value", "v" + i));
        atlasEntity2.setAttribute(NAME, str3 + "(v" + i + ")");
        atlasEntity2.setAttribute(DESCRIPTION_ATTR, field.schema().getType());
        return atlasEntity2;
    }

    @VisibleForTesting
    static String getTopicQualifiedName(String str, String str2) {
        return String.format(FORMAT_KAKFA_TOPIC_QUALIFIED_NAME, str2.toLowerCase(), str);
    }

    @VisibleForTesting
    static String getSchemaQualifiedName(String str, String str2, String str3) {
        return String.format(FORMAT_KAKFA_SCHEMA_QUALIFIED_NAME, str2.toLowerCase(), str3, str);
    }

    @VisibleForTesting
    static String getFieldQualifiedName(String str, String str2, String str3, String str4) {
        return String.format(FORMAT_KAKFA_FIELD_QUALIFIED_NAME, str2.toLowerCase(), str3.toLowerCase(), str4, str);
    }

    @VisibleForTesting
    AtlasEntity.AtlasEntityWithExtInfo findEntityInAtlas(String str, String str2) throws Exception {
        AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = EXIT_CODE_SUCCESS;
        try {
            atlasEntityWithExtInfo = this.atlasClientV2.getEntityByAttribute(str, Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, str2));
        } catch (Exception e) {
            LOG.info("Exception on finding Atlas Entity: {}", e);
        }
        return atlasEntityWithExtInfo;
    }

    @VisibleForTesting
    AtlasEntity.AtlasEntityWithExtInfo createEntityInAtlas(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo) throws Exception {
        AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo2 = EXIT_CODE_SUCCESS;
        List createdEntities = this.atlasClientV2.createEntity(atlasEntityWithExtInfo).getCreatedEntities();
        if (CollectionUtils.isNotEmpty(createdEntities)) {
            atlasEntityWithExtInfo2 = this.atlasClientV2.getEntityByGuid(((AtlasEntityHeader) createdEntities.get(EXIT_CODE_SUCCESS)).getGuid());
            LOG.info("Created {} entity: name={}, guid={}", new Object[]{atlasEntityWithExtInfo2.getEntity().getTypeName(), atlasEntityWithExtInfo2.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), atlasEntityWithExtInfo2.getEntity().getGuid()});
        }
        return atlasEntityWithExtInfo2;
    }

    @VisibleForTesting
    AtlasEntity.AtlasEntityWithExtInfo updateEntityInAtlas(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo) throws Exception {
        AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo2;
        EntityMutationResponse updateEntity = this.atlasClientV2.updateEntity(atlasEntityWithExtInfo);
        if (updateEntity != null) {
            List updatedEntities = updateEntity.getUpdatedEntities();
            if (CollectionUtils.isNotEmpty(updatedEntities)) {
                atlasEntityWithExtInfo2 = this.atlasClientV2.getEntityByGuid(((AtlasEntityHeader) updatedEntities.get(EXIT_CODE_SUCCESS)).getGuid());
                LOG.info("Updated {} entity: name={}, guid={} ", new Object[]{atlasEntityWithExtInfo2.getEntity().getTypeName(), atlasEntityWithExtInfo2.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), atlasEntityWithExtInfo2.getEntity().getGuid()});
            } else {
                LOG.info("Entity: name={} ", atlasEntityWithExtInfo.toString() + " not updated as it is unchanged from what is in Atlas");
                atlasEntityWithExtInfo2 = atlasEntityWithExtInfo;
            }
        } else {
            LOG.info("Entity: name={} ", atlasEntityWithExtInfo.toString() + " not updated as it is unchanged from what is in Atlas");
            atlasEntityWithExtInfo2 = atlasEntityWithExtInfo;
        }
        return atlasEntityWithExtInfo2;
    }

    private static void printUsage() {
        System.out.println("Usage 1: import-kafka.sh");
        System.out.println("Usage 2: import-kafka.sh [-t <topic regex> OR --topic <topic regex>]");
        System.out.println("Usage 3: import-kafka.sh [-f <filename>]");
        System.out.println("   Format:");
        System.out.println("        topic1 OR topic1 regex");
        System.out.println("        topic2 OR topic2 regex");
        System.out.println("        topic3 OR topic3 regex");
    }

    private void clearRelationshipAttributes(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo) {
        if (atlasEntityWithExtInfo != null) {
            clearRelationshipAttributes(atlasEntityWithExtInfo.getEntity());
            if (atlasEntityWithExtInfo.getReferredEntities() != null) {
                clearRelationshipAttributes(atlasEntityWithExtInfo.getReferredEntities().values());
            }
        }
    }

    private void clearRelationshipAttributes(Collection<AtlasEntity> collection) {
        if (collection != null) {
            Iterator<AtlasEntity> it = collection.iterator();
            while (it.hasNext()) {
                clearRelationshipAttributes(it.next());
            }
        }
    }

    private void clearRelationshipAttributes(AtlasEntity atlasEntity) {
        if (atlasEntity == null || atlasEntity.getRelationshipAttributes() == null) {
            return;
        }
        atlasEntity.getRelationshipAttributes().clear();
    }

    private List<AtlasEntity> findOrCreateAtlasSchema(String str) throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator<Integer> it = SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(this.httpClient, str + "-value").iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            String schemaFromKafkaSchemaRegistry = SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(this.httpClient, str + "-value", intValue);
            if (schemaFromKafkaSchemaRegistry != null) {
                System.out.println("---Found Schema " + str + "-value in Kafka Schema Registry with Version " + intValue);
                LOG.info("Found Schema {}-value in Kafka Schema Registry with Version {}", str, Integer.valueOf(intValue));
                if (findEntityInAtlas(KafkaDataTypes.AVRO_SCHEMA.getName(), getSchemaQualifiedName(this.metadataNamespace, str + "-value", "v" + intValue)) != null) {
                    System.out.println("---Found Entity avro_schema " + str + " in Atlas");
                    LOG.info("Found Entity avro_schema {} in Atlas", str);
                    arrayList.add(createOrUpdateSchema(schemaFromKafkaSchemaRegistry, str, null, intValue).getEntity());
                } else {
                    System.out.println("---NOT Found Entity avro_schema " + str + " in Atlas");
                    LOG.info("NOT Found Entity avro_schema {} in Atlas", str);
                    arrayList.add(createOrUpdateSchema(schemaFromKafkaSchemaRegistry, str, null, intValue).getEntity());
                }
            }
        }
        return arrayList;
    }

    private String concatFullname(String str, String str2, String str3) {
        return str2.isEmpty() ? str3.isEmpty() ? str : str + "." + str3 : str3.isEmpty() ? str2 + "." + str : str2 + "." + str3 + "." + str;
    }
}
