/*
 * Decompiled with CFR 0.152.
 */
package org.apache.avro.mapred;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.AvroMapper;
import org.apache.avro.mapred.AvroMultipleInputs;
import org.apache.avro.mapred.AvroReducer;
import org.apache.avro.mapred.Pair;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestAvroMultipleInputs {
    @TempDir
    public File OUTPUT_DIR;
    @TempDir
    public File INPUT_DIR_1;
    @TempDir
    public File INPUT_DIR_2;

    @Test
    void job() throws Exception {
        JobConf job = new JobConf();
        Path inputPath1 = new Path(this.INPUT_DIR_1.getPath());
        Path inputPath2 = new Path(this.INPUT_DIR_2.getPath());
        Path outputPath = new Path(this.OUTPUT_DIR.getPath());
        outputPath.getFileSystem((Configuration)job).delete(outputPath, true);
        this.writeNamesFiles(new File(inputPath1.toUri().getPath()));
        this.writeBalancesFiles(new File(inputPath2.toUri().getPath()));
        job.setJobName("multiple-inputs-join");
        AvroMultipleInputs.addInputPath((JobConf)job, (Path)inputPath1, NamesMapImpl.class, (Schema)ReflectData.get().getSchema(NamesRecord.class));
        AvroMultipleInputs.addInputPath((JobConf)job, (Path)inputPath2, BalancesMapImpl.class, (Schema)ReflectData.get().getSchema(BalancesRecord.class));
        Schema keySchema = ReflectData.get().getSchema(KeyRecord.class);
        Schema valueSchema = ReflectData.get().getSchema(JoinableRecord.class);
        AvroJob.setMapOutputSchema((JobConf)job, (Schema)Pair.getPairSchema((Schema)keySchema, (Schema)valueSchema));
        AvroJob.setOutputSchema((JobConf)job, (Schema)ReflectData.get().getSchema(CompleteRecord.class));
        AvroJob.setReducerClass((JobConf)job, ReduceImpl.class);
        job.setNumReduceTasks(1);
        FileOutputFormat.setOutputPath((JobConf)job, (Path)outputPath);
        AvroJob.setReflect((JobConf)job);
        JobClient.runJob((JobConf)job);
        this.validateCompleteFile(new File(this.OUTPUT_DIR, "part-00000.avro"));
    }

    private void writeNamesFiles(File dir) throws IOException {
        ReflectDatumWriter writer = new ReflectDatumWriter();
        File namesFile = new File(dir + "/names.avro");
        try (DataFileWriter out = new DataFileWriter((DatumWriter)writer);){
            out.create(ReflectData.get().getSchema(NamesRecord.class), namesFile);
            for (int i = 0; i < 5; ++i) {
                out.append((Object)new NamesRecord(i, "record" + i));
            }
        }
    }

    private void writeBalancesFiles(File dir) throws IOException {
        ReflectDatumWriter writer = new ReflectDatumWriter();
        File namesFile = new File(dir + "/balances.avro");
        try (DataFileWriter out = new DataFileWriter((DatumWriter)writer);){
            out.create(ReflectData.get().getSchema(BalancesRecord.class), namesFile);
            for (int i = 0; i < 5; ++i) {
                out.append((Object)new BalancesRecord(i, (long)i + 100L));
            }
        }
    }

    private void validateCompleteFile(File file) throws Exception {
        ReflectDatumReader reader = new ReflectDatumReader();
        int numRecs = 0;
        try (BufferedInputStream in = new BufferedInputStream(new FileInputStream(file));
             DataFileStream records = new DataFileStream((InputStream)in, (DatumReader)reader);){
            for (CompleteRecord rec : records) {
                Assertions.assertEquals((int)rec.id, (int)numRecs);
                Assertions.assertEquals((long)(rec.balance - 100L), (long)rec.id);
                Assertions.assertEquals((Object)rec.name, (Object)("record" + rec.id));
                ++numRecs;
            }
        }
        Assertions.assertEquals((int)5, (int)numRecs);
    }

    public static class NamesMapImpl
    extends AvroMapper<NamesRecord, Pair<KeyRecord, JoinableRecord>> {
        public void map(NamesRecord nameRecord, AvroCollector<Pair<KeyRecord, JoinableRecord>> collector, Reporter reporter) throws IOException {
            collector.collect((Object)new Pair((Object)new KeyRecord(nameRecord.id), (Object)new JoinableRecord(nameRecord.getClass().getName(), nameRecord.id, nameRecord.name, -1L)));
        }
    }

    public static class NamesRecord {
        private int id = -1;
        private CharSequence name = "";

        public NamesRecord() {
        }

        public NamesRecord(int id, CharSequence name) {
            this.id = id;
            this.name = name;
        }

        public String toString() {
            return this.id + "\t" + this.name;
        }
    }

    public static class BalancesMapImpl
    extends AvroMapper<BalancesRecord, Pair<KeyRecord, JoinableRecord>> {
        public void map(BalancesRecord balanceRecord, AvroCollector<Pair<KeyRecord, JoinableRecord>> collector, Reporter reporter) throws IOException {
            collector.collect((Object)new Pair((Object)new KeyRecord(balanceRecord.id), (Object)new JoinableRecord(balanceRecord.getClass().getName(), balanceRecord.id, "", balanceRecord.balance)));
        }
    }

    public static class BalancesRecord {
        private int id = -1;
        private long balance = 0L;

        public BalancesRecord() {
        }

        public BalancesRecord(int id, long balance) {
            this.id = id;
            this.balance = balance;
        }

        public String toString() {
            return this.id + "\t" + this.balance;
        }
    }

    public static class KeyRecord {
        private int id = -1;

        public KeyRecord() {
        }

        public KeyRecord(int id) {
            this.id = id;
        }

        public String toString() {
            return Integer.valueOf(this.id).toString();
        }
    }

    public static class JoinableRecord {
        private int id = -1;
        private CharSequence name = "";
        private long balance = 0L;
        private CharSequence recType = "";

        public JoinableRecord() {
        }

        public JoinableRecord(CharSequence recType, int id, CharSequence name, long balance) {
            this.id = id;
            this.recType = recType;
            this.name = name;
            this.balance = balance;
        }

        public String toString() {
            return this.recType.toString();
        }
    }

    public static class CompleteRecord {
        private int id = -1;
        private CharSequence name = "";
        private long balance = 0L;

        public CompleteRecord() {
        }

        public CompleteRecord(int id, CharSequence name, long balance) {
            this.name = name;
            this.id = id;
            this.balance = balance;
        }

        void setId(int id) {
            this.id = id;
        }

        void setName(CharSequence name) {
            this.name = name;
        }

        void setBalance(long balance) {
            this.balance = balance;
        }

        public String toString() {
            return this.id + "\t" + this.name + "\t" + this.balance;
        }
    }

    public static class ReduceImpl
    extends AvroReducer<KeyRecord, JoinableRecord, CompleteRecord> {
        public void reduce(KeyRecord ID, Iterable<JoinableRecord> joinables, AvroCollector<CompleteRecord> collector, Reporter reporter) throws IOException {
            CompleteRecord rec = new CompleteRecord();
            for (JoinableRecord joinable : joinables) {
                rec.setId(joinable.id);
                if (joinable.recType.toString().contains("NamesRecord")) {
                    rec.setName(joinable.name);
                    continue;
                }
                rec.setBalance(joinable.balance);
            }
            collector.collect((Object)rec);
        }
    }
}

