/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.history.logging.proto;

import com.google.protobuf.CodedInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.VersionInfo;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.AppLaunchedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.logging.proto.DagManifesFileScanner;
import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger;
import org.apache.tez.dag.history.logging.proto.HistoryEventProtoConverter;
import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos;
import org.apache.tez.dag.history.logging.proto.ProtoHistoryLoggingService;
import org.apache.tez.dag.history.logging.proto.ProtoMessageReader;
import org.apache.tez.dag.history.logging.proto.ProtoMessageWritable;
import org.apache.tez.dag.history.logging.proto.TezProtoLoggers;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

public class TestProtoHistoryLoggingService {
    private static ApplicationId appId = ApplicationId.newInstance((long)1000L, (int)1);
    private static ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)1);
    private static String user = "TEST_USER";
    private Clock clock;
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    @Test
    public void testService() throws Exception {
        ProtoHistoryLoggingService service = this.createService(false);
        service.start();
        TezDAGID dagId = TezDAGID.getInstance((ApplicationId)appId, (int)0);
        ArrayList<HistoryLoggerProtos.HistoryEventProto> protos = new ArrayList<HistoryLoggerProtos.HistoryEventProto>();
        for (DAGHistoryEvent event : this.makeHistoryEvents(dagId, service)) {
            protos.add(new HistoryEventProtoConverter().convert(event.getHistoryEvent()));
            service.handle(event);
        }
        service.stop();
        TezProtoLoggers loggers = new TezProtoLoggers();
        Assert.assertTrue((boolean)loggers.setup(service.getConfig(), this.clock));
        DatePartitionedLogger dagLogger = loggers.getDagEventsLogger();
        Path dagFilePath = dagLogger.getPathForDate(LocalDate.ofEpochDay(0L), dagId + "_1");
        ProtoMessageReader reader = dagLogger.getReader(dagFilePath);
        this.assertEventsRead((ProtoMessageReader<HistoryLoggerProtos.HistoryEventProto>)reader, protos, 1, protos.size());
        reader.close();
        DatePartitionedLogger appLogger = loggers.getAppEventsLogger();
        Path appFilePath = appLogger.getPathForDate(LocalDate.ofEpochDay(0L), attemptId.toString());
        ProtoMessageReader appReader = appLogger.getReader(appFilePath);
        long appOffset = appReader.getOffset();
        Assert.assertEquals(protos.get(0), (Object)appReader.readEvent());
        appReader.close();
        DatePartitionedLogger manifestLogger = loggers.getManifestEventsLogger();
        Path manifestFilePath = manifestLogger.getPathForDate(LocalDate.ofEpochDay(0L), attemptId.toString());
        ProtoMessageReader reader2 = manifestLogger.getReader(manifestFilePath);
        HistoryLoggerProtos.ManifestEntryProto manifest = (HistoryLoggerProtos.ManifestEntryProto)reader2.readEvent();
        Assert.assertEquals((Object)appId.toString(), (Object)manifest.getAppId());
        Assert.assertEquals((Object)dagId.toString(), (Object)manifest.getDagId());
        Assert.assertEquals((Object)dagFilePath.toString(), (Object)manifest.getDagFilePath());
        Assert.assertEquals((Object)appFilePath.toString(), (Object)manifest.getAppFilePath());
        Assert.assertEquals((long)appOffset, (long)manifest.getAppLaunchedEventOffset());
        reader = dagLogger.getReader(new Path(manifest.getDagFilePath()));
        reader.setOffset(manifest.getDagSubmittedEventOffset());
        HistoryLoggerProtos.HistoryEventProto evt = (HistoryLoggerProtos.HistoryEventProto)reader.readEvent();
        Assert.assertNotNull((Object)evt);
        Assert.assertEquals((Object)HistoryEventType.DAG_SUBMITTED.name(), (Object)evt.getEventType());
        reader.setOffset(manifest.getDagFinishedEventOffset());
        evt = (HistoryLoggerProtos.HistoryEventProto)reader.readEvent();
        Assert.assertNotNull((Object)evt);
        Assert.assertEquals((Object)HistoryEventType.DAG_FINISHED.name(), (Object)evt.getEventType());
        reader.close();
        DagManifesFileScanner scanner = new DagManifesFileScanner(manifestLogger);
        Assert.assertEquals((Object)manifest, (Object)scanner.getNext());
        Assert.assertNull((Object)scanner.getNext());
        scanner.close();
    }

    @Test
    public void testProtoMessageSizeReset() throws Exception {
        ProtoHistoryLoggingService service = this.createService(false);
        service.start();
        TezDAGID dagId = TezDAGID.getInstance((ApplicationId)appId, (int)0);
        ArrayList<HistoryLoggerProtos.HistoryEventProto> protos = new ArrayList<HistoryLoggerProtos.HistoryEventProto>();
        for (DAGHistoryEvent event : this.makeHistoryEvents(dagId, service)) {
            protos.add(new HistoryEventProtoConverter().convert(event.getHistoryEvent()));
            service.handle(event);
        }
        service.stop();
        TezProtoLoggers loggers = new TezProtoLoggers();
        Assert.assertTrue((boolean)loggers.setup(service.getConfig(), this.clock));
        DatePartitionedLogger dagLogger = loggers.getDagEventsLogger();
        Path dagFilePath = dagLogger.getPathForDate(LocalDate.ofEpochDay(0L), dagId + "_1");
        try (ProtoMessageReader reader = dagLogger.getReader(dagFilePath);){
            this.assertEventsRead((ProtoMessageReader<HistoryLoggerProtos.HistoryEventProto>)reader, protos, 1, protos.size());
            int totalBytesRead = TestProtoHistoryLoggingService.getTotalBytesRead((ProtoMessageReader<HistoryLoggerProtos.HistoryEventProto>)reader);
            Assert.assertEquals((long)totalBytesRead, (long)0L);
        }
    }

    private static int getTotalBytesRead(ProtoMessageReader<HistoryLoggerProtos.HistoryEventProto> reader) throws NoSuchFieldException, IllegalAccessException {
        Field f = reader.getClass().getDeclaredField("writable");
        f.setAccessible(true);
        ProtoMessageWritable writable = (ProtoMessageWritable)f.get(reader);
        Field c = writable.getClass().getDeclaredField("cin");
        c.setAccessible(true);
        CodedInputStream cin = (CodedInputStream)c.get(writable);
        return cin.getTotalBytesRead();
    }

    @Test
    public void testServiceSplitEvents() throws Exception {
        ProtoHistoryLoggingService service = this.createService(true);
        service.start();
        TezDAGID dagId = TezDAGID.getInstance((ApplicationId)appId, (int)0);
        ArrayList<HistoryLoggerProtos.HistoryEventProto> protos = new ArrayList<HistoryLoggerProtos.HistoryEventProto>();
        for (DAGHistoryEvent event : this.makeHistoryEvents(dagId, service)) {
            protos.add(new HistoryEventProtoConverter().convert(event.getHistoryEvent()));
            service.handle(event);
        }
        service.stop();
        TezProtoLoggers loggers = new TezProtoLoggers();
        Assert.assertTrue((boolean)loggers.setup(service.getConfig(), this.clock));
        DatePartitionedLogger dagLogger = loggers.getDagEventsLogger();
        Path dagFilePath1 = dagLogger.getPathForDate(LocalDate.ofEpochDay(0L), dagId + "_1");
        Path dagFilePath2 = dagLogger.getPathForDate(LocalDate.ofEpochDay(0L), dagId + "_1" + "_1");
        try (ProtoMessageReader reader = dagLogger.getReader(dagFilePath1);){
            this.assertEventsRead((ProtoMessageReader<HistoryLoggerProtos.HistoryEventProto>)reader, protos, 1, 4);
        }
        reader = dagLogger.getReader(dagFilePath2);
        try {
            this.assertEventsRead((ProtoMessageReader<HistoryLoggerProtos.HistoryEventProto>)reader, protos, 4, protos.size());
        }
        finally {
            if (reader != null) {
                reader.close();
            }
        }
        DatePartitionedLogger appLogger = loggers.getAppEventsLogger();
        Path appFilePath = appLogger.getPathForDate(LocalDate.ofEpochDay(0L), attemptId.toString());
        ProtoMessageReader appReader = appLogger.getReader(appFilePath);
        long appOffset = appReader.getOffset();
        Assert.assertEquals(protos.get(0), (Object)appReader.readEvent());
        appReader.close();
        DatePartitionedLogger manifestLogger = loggers.getManifestEventsLogger();
        DagManifesFileScanner scanner = new DagManifesFileScanner(manifestLogger);
        Path manifestFilePath = manifestLogger.getPathForDate(LocalDate.ofEpochDay(0L), attemptId.toString());
        ProtoMessageReader manifestReader = manifestLogger.getReader(manifestFilePath);
        HistoryLoggerProtos.ManifestEntryProto manifest = (HistoryLoggerProtos.ManifestEntryProto)manifestReader.readEvent();
        Assert.assertEquals((Object)manifest, (Object)scanner.getNext());
        Assert.assertEquals((Object)appId.toString(), (Object)manifest.getAppId());
        Assert.assertEquals((Object)dagId.toString(), (Object)manifest.getDagId());
        Assert.assertEquals((Object)dagFilePath1.toString(), (Object)manifest.getDagFilePath());
        Assert.assertEquals((Object)appFilePath.toString(), (Object)manifest.getAppFilePath());
        Assert.assertEquals((long)appOffset, (long)manifest.getAppLaunchedEventOffset());
        Assert.assertEquals((long)-1L, (long)manifest.getDagFinishedEventOffset());
        HistoryLoggerProtos.HistoryEventProto evt = null;
        try (ProtoMessageReader reader = dagLogger.getReader(new Path(manifest.getDagFilePath()));){
            reader.setOffset(manifest.getDagSubmittedEventOffset());
            evt = (HistoryLoggerProtos.HistoryEventProto)reader.readEvent();
            Assert.assertNotNull((Object)evt);
            Assert.assertEquals((Object)HistoryEventType.DAG_SUBMITTED.name(), (Object)evt.getEventType());
        }
        manifest = (HistoryLoggerProtos.ManifestEntryProto)manifestReader.readEvent();
        Assert.assertEquals((Object)manifest, (Object)scanner.getNext());
        Assert.assertEquals((Object)appId.toString(), (Object)manifest.getAppId());
        Assert.assertEquals((Object)dagId.toString(), (Object)manifest.getDagId());
        Assert.assertEquals((Object)dagFilePath2.toString(), (Object)manifest.getDagFilePath());
        Assert.assertEquals((Object)appFilePath.toString(), (Object)manifest.getAppFilePath());
        Assert.assertEquals((long)appOffset, (long)manifest.getAppLaunchedEventOffset());
        Assert.assertEquals((long)-1L, (long)manifest.getDagSubmittedEventOffset());
        reader = dagLogger.getReader(new Path(manifest.getDagFilePath()));
        try {
            reader.setOffset(manifest.getDagFinishedEventOffset());
            evt = (HistoryLoggerProtos.HistoryEventProto)reader.readEvent();
            Assert.assertNotNull((Object)evt);
            Assert.assertEquals((Object)HistoryEventType.DAG_FINISHED.name(), (Object)evt.getEventType());
        }
        finally {
            if (reader != null) {
                reader.close();
            }
        }
        Assert.assertNull((Object)scanner.getNext());
        scanner.close();
    }

    @Test
    public void testDirPermissions() throws IOException {
        Path basePath = new Path(this.tempFolder.newFolder().getAbsolutePath());
        Configuration conf = new Configuration();
        FileSystem fs = basePath.getFileSystem(conf);
        FsPermission expectedPermissions = FsPermission.createImmutable((short)1023);
        Assert.assertTrue((boolean)fs.exists(basePath));
        Assert.assertNotEquals((Object)expectedPermissions, (Object)fs.getFileStatus(basePath).getPermission());
        new DatePartitionedLogger(HistoryLoggerProtos.HistoryEventProto.PARSER, basePath, conf, (Clock)new FixedClock(Time.now()));
        Assert.assertEquals((Object)expectedPermissions, (Object)fs.getFileStatus(basePath).getPermission());
    }

    private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId, ProtoHistoryLoggingService service) {
        ArrayList<DAGHistoryEvent> historyEvents = new ArrayList<DAGHistoryEvent>();
        DAGProtos.DAGPlan dagPlan = DAGProtos.DAGPlan.newBuilder().setName("DAGPlanMock").build();
        long time = System.currentTimeMillis();
        Configuration conf = new Configuration(service.getConfig());
        historyEvents.add(new DAGHistoryEvent(null, (HistoryEvent)new AppLaunchedEvent(appId, time, time, user, conf, new VersionInfo("component", "1.1.0", "rev1", "20120101", "git.apache.org"){})));
        historyEvents.add(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGSubmittedEvent(dagId, time, DAGProtos.DAGPlan.getDefaultInstance(), attemptId, null, user, conf, null, "default")));
        historyEvents.add(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGInitializedEvent(dagId, time + 1L, user, "test_dag", Collections.emptyMap())));
        historyEvents.add(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGStartedEvent(dagId, time + 2L, user, "test_dag")));
        TezVertexID vertexID = TezVertexID.getInstance((TezDAGID)dagId, (int)1);
        historyEvents.add(new DAGHistoryEvent(dagId, (HistoryEvent)new VertexStartedEvent(vertexID, time, time)));
        TezTaskID tezTaskID = TezTaskID.getInstance((TezVertexID)vertexID, (int)1);
        historyEvents.add(new DAGHistoryEvent(dagId, (HistoryEvent)new TaskStartedEvent(tezTaskID, "test", time, time)));
        historyEvents.add(new DAGHistoryEvent(dagId, (HistoryEvent)new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance((TezTaskID)tezTaskID, (int)1), "test", time, ContainerId.newContainerId((ApplicationAttemptId)attemptId, (long)1L), NodeId.newInstance((String)"localhost", (int)8765), null, null, null)));
        historyEvents.add(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGFinishedEvent(dagId, time, time, DAGState.ERROR, "diagnostics", null, user, dagPlan.getName(), new HashMap(), attemptId, dagPlan)));
        return historyEvents;
    }

    private ProtoHistoryLoggingService createService(boolean splitEvents) throws IOException {
        ProtoHistoryLoggingService service = new ProtoHistoryLoggingService();
        this.clock = new FixedClock(0L);
        AppContext appContext = (AppContext)Mockito.mock(AppContext.class);
        Mockito.when((Object)appContext.getApplicationID()).thenReturn((Object)appId);
        Mockito.when((Object)appContext.getApplicationAttemptId()).thenReturn((Object)attemptId);
        Mockito.when((Object)appContext.getUser()).thenReturn((Object)user);
        Mockito.when((Object)appContext.getHadoopShim()).thenReturn((Object)new HadoopShim(){});
        Mockito.when((Object)appContext.getClock()).thenReturn((Object)this.clock);
        service.setAppContext(appContext);
        Configuration conf = new Configuration(false);
        String basePath = this.tempFolder.newFolder().getAbsolutePath();
        conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem");
        conf.set("tez.history.logging.proto-base-dir", basePath);
        conf.setBoolean("tez.history.logging.split-dag-start", splitEvents);
        service.init(conf);
        return service;
    }

    private void assertEventsRead(ProtoMessageReader<HistoryLoggerProtos.HistoryEventProto> reader, List<HistoryLoggerProtos.HistoryEventProto> protos, int start, int finish) throws Exception {
        for (int i = start; i < finish; ++i) {
            try {
                HistoryLoggerProtos.HistoryEventProto evt = (HistoryLoggerProtos.HistoryEventProto)reader.readEvent();
                Assert.assertEquals((Object)protos.get(i), (Object)evt);
                continue;
            }
            catch (EOFException e) {
                Assert.fail((String)"Unexpected eof");
            }
        }
        try {
            HistoryLoggerProtos.HistoryEventProto evt = (HistoryLoggerProtos.HistoryEventProto)reader.readEvent();
            Assert.assertNull((Object)evt);
        }
        catch (EOFException eOFException) {
            // empty catch block
        }
    }

    private static class FixedClock
    implements Clock {
        final Clock clock = SystemClock.getInstance();
        final long diff;

        public FixedClock(long startTime) {
            this.diff = this.clock.getTime() - startTime;
        }

        public long getTime() {
            return this.clock.getTime() - this.diff;
        }
    }
}

