package org.apache.spark.streaming;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.streaming.util.WriteAheadLog;
import org.apache.spark.streaming.util.WriteAheadLogRecordHandle;
import org.apache.spark.streaming.util.WriteAheadLogUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.sparkproject.guava.collect.Iterators;

/* loaded from: input_file:org/apache/spark/streaming/JavaWriteAheadLogSuite.class */
public class JavaWriteAheadLogSuite extends WriteAheadLog {
    private int index = -1;
    private final List<Record> records = new ArrayList();

    /* loaded from: input_file:org/apache/spark/streaming/JavaWriteAheadLogSuite$JavaWriteAheadLogSuiteHandle.class */
    static class JavaWriteAheadLogSuiteHandle extends WriteAheadLogRecordHandle {
        int index;

        JavaWriteAheadLogSuiteHandle(int i) {
            this.index = -1;
            this.index = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/spark/streaming/JavaWriteAheadLogSuite$Record.class */
    public static class Record {
        long time;
        int index;
        ByteBuffer buffer;

        Record(long j, int i, ByteBuffer byteBuffer) {
            this.index = i;
            this.time = j;
            this.buffer = byteBuffer;
        }
    }

    public WriteAheadLogRecordHandle write(ByteBuffer byteBuffer, long j) {
        this.index++;
        this.records.add(new Record(j, this.index, byteBuffer));
        return new JavaWriteAheadLogSuiteHandle(this.index);
    }

    public ByteBuffer read(WriteAheadLogRecordHandle writeAheadLogRecordHandle) {
        if (!(writeAheadLogRecordHandle instanceof JavaWriteAheadLogSuiteHandle)) {
            return null;
        }
        int i = ((JavaWriteAheadLogSuiteHandle) writeAheadLogRecordHandle).index;
        for (Record record : this.records) {
            if (record.index == i) {
                return record.buffer;
            }
        }
        return null;
    }

    public Iterator<ByteBuffer> readAll() {
        return Iterators.transform(this.records.iterator(), record -> {
            return record.buffer;
        });
    }

    public void clean(long j, boolean z) {
        int i = 0;
        while (i < this.records.size()) {
            if (this.records.get(i).time < j) {
                this.records.remove(i);
                i--;
            }
            i++;
        }
    }

    public void close() {
        this.records.clear();
    }

    @Test
    public void testCustomWAL() {
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.streaming.driver.writeAheadLog.class", JavaWriteAheadLogSuite.class.getName());
        sparkConf.set("spark.streaming.driver.writeAheadLog.allowBatching", "false");
        WriteAheadLog createLogForDriver = WriteAheadLogUtils.createLogForDriver(sparkConf, (String) null, (Configuration) null);
        WriteAheadLogRecordHandle write = createLogForDriver.write(JavaUtils.stringToBytes("data1"), 1234L);
        Assertions.assertTrue(write instanceof JavaWriteAheadLogSuiteHandle);
        Assertions.assertEquals("data1", JavaUtils.bytesToString(createLogForDriver.read(write)));
        createLogForDriver.write(JavaUtils.stringToBytes("data2"), 1235L);
        createLogForDriver.write(JavaUtils.stringToBytes("data3"), 1236L);
        createLogForDriver.write(JavaUtils.stringToBytes("data4"), 1237L);
        createLogForDriver.clean(1236L, false);
        Iterator readAll = createLogForDriver.readAll();
        ArrayList arrayList = new ArrayList();
        while (readAll.hasNext()) {
            arrayList.add(JavaUtils.bytesToString((ByteBuffer) readAll.next()));
        }
        Assertions.assertEquals(Arrays.asList("data3", "data4"), arrayList);
    }
}
