/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.stream.server;

import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.DataModelManager;
import org.apache.kylin.metadata.model.ISourceAware;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.apache.kylin.stream.coordinator.StreamMetadataStore;
import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory;
import org.apache.kylin.stream.coordinator.StreamingUtils;
import org.apache.kylin.stream.coordinator.client.CoordinatorClient;
import org.apache.kylin.stream.coordinator.client.HttpCoordinatorClient;
import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol;
import org.apache.kylin.stream.core.consumer.EndPositionStopCondition;
import org.apache.kylin.stream.core.consumer.IConsumerProvider;
import org.apache.kylin.stream.core.consumer.IStopConsumptionCondition;
import org.apache.kylin.stream.core.consumer.IStreamingConnector;
import org.apache.kylin.stream.core.consumer.StreamingConsumerChannel;
import org.apache.kylin.stream.core.metrics.StreamingMetrics;
import org.apache.kylin.stream.core.model.ConsumerStatsResponse;
import org.apache.kylin.stream.core.model.Node;
import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.core.model.StreamingCubeConsumeState;
import org.apache.kylin.stream.core.model.stats.ReceiverCubeStats;
import org.apache.kylin.stream.core.model.stats.ReceiverStats;
import org.apache.kylin.stream.core.source.ISourcePosition;
import org.apache.kylin.stream.core.source.ISourcePositionHandler;
import org.apache.kylin.stream.core.source.IStreamingSource;
import org.apache.kylin.stream.core.source.Partition;
import org.apache.kylin.stream.core.source.StreamingSourceConfigManager;
import org.apache.kylin.stream.core.source.StreamingSourceFactory;
import org.apache.kylin.stream.core.storage.StreamingCubeSegment;
import org.apache.kylin.stream.core.storage.StreamingSegmentManager;
import org.apache.kylin.stream.core.storage.columnar.ColumnarStoreCache;
import org.apache.kylin.stream.core.util.HDFSUtil;
import org.apache.kylin.stream.core.util.NamedThreadFactory;
import org.apache.kylin.stream.core.util.NodeUtil;
import org.apache.kylin.stream.server.ReplicaSetLeaderSelector;
import org.apache.kylin.stream.server.retention.RetentionPolicyInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingServer
implements ReplicaSetLeaderSelector.LeaderChangeListener,
IConsumerProvider {
    private static final Logger logger = LoggerFactory.getLogger(StreamingServer.class);
    public static final int DEFAULT_PORT = 9090;
    private static final int CONSUMER_STOP_WAIT_TIMEOUT = 10000;
    private static volatile StreamingServer instance = null;
    private Map<String, StreamingConsumerChannel> cubeConsumerMap = Maps.newHashMap();
    private Map<String, List<Partition>> assignments = Maps.newHashMap();
    private Map<String, StreamingSegmentManager> streamingSegmentManagerMap = new ConcurrentHashMap<String, StreamingSegmentManager>();
    private CuratorFramework streamZKClient = StreamingUtils.getZookeeperClient();
    private ReplicaSetLeaderSelector leaderSelector;
    private CoordinatorClient coordinatorClient;
    private StreamMetadataStore streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore();
    private Node currentNode;
    private int replicaSetID = -1;
    private volatile boolean isLeader = false;
    private ScheduledExecutorService segmentStateCheckerExecutor;
    private ExecutorService segmentFlushExecutor;
    private final String baseStorePath;

    private StreamingServer() {
        this.coordinatorClient = new HttpCoordinatorClient(this.streamMetadataStore);
        this.currentNode = NodeUtil.getCurrentNode((int)9090);
        this.baseStorePath = this.calLocalSegmentCacheDir();
        this.segmentStateCheckerExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new NamedThreadFactory("segment_state_check"));
        this.segmentFlushExecutor = Executors.newFixedThreadPool(5, (ThreadFactory)new NamedThreadFactory("segment_flush"));
    }

    @VisibleForTesting
    public void setCoordinatorClient(CoordinatorClient coordinatorClient) {
        this.coordinatorClient = coordinatorClient;
    }

    public static synchronized StreamingServer getInstance() {
        if (instance == null) {
            instance = new StreamingServer();
        }
        return instance;
    }

    public void start() throws Exception {
        this.registerReceiver();
        ReplicaSet rs = this.findBelongReplicaSet();
        if (rs != null) {
            this.addToReplicaSet(rs.getReplicaSetID());
        }
        this.startMetrics();
        this.startSegmentStateChecker();
        this.addShutdownHook();
    }

    private void startSegmentStateChecker() {
        this.segmentStateCheckerExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                Collection<StreamingSegmentManager> segmentManagers = StreamingServer.this.getAllCubeSegmentManagers();
                long curr = System.currentTimeMillis();
                for (StreamingSegmentManager segmentManager : segmentManagers) {
                    CubeInstance cubeInstance = segmentManager.getCubeInstance();
                    String cubeName = cubeInstance.getName();
                    try {
                        Collection activeSegments = segmentManager.getActiveSegments();
                        for (StreamingCubeSegment segment : activeSegments) {
                            long delta = curr - segment.getLastUpdateTime();
                            if (curr <= segment.getDateRangeEnd() || delta <= segmentManager.cubeDuration) continue;
                            logger.debug("Make {} immutable because it lastUpdate[{}] exceed wait duration.", (Object)segment.getSegmentName(), (Object)segment.getLastUpdateTime());
                            segmentManager.makeSegmentImmutable(segment.getSegmentName());
                        }
                        RetentionPolicyInfo retentionPolicyInfo = new RetentionPolicyInfo();
                        String policyName = cubeInstance.getConfig().getStreamingSegmentRetentionPolicy();
                        Map policyProps = cubeInstance.getConfig().getStreamingSegmentRetentionPolicyProperties(policyName);
                        retentionPolicyInfo.setName(policyName);
                        retentionPolicyInfo.setProperties(policyProps);
                        Collection segments = segmentManager.getRequireRemotePersistSegments();
                        if (segments.isEmpty()) continue;
                        logger.info("found cube {} segments:{} are immutable, retention policy is: {}", new Object[]{cubeName, segments, retentionPolicyInfo.getName()});
                        StreamingServer.this.handleImmutableCubeSegments(cubeName, segmentManager, segments, retentionPolicyInfo);
                    }
                    catch (Exception e) {
                        logger.error("error when handle cube:" + cubeName, (Throwable)e);
                    }
                }
            }
        }, 60L, 60L, TimeUnit.SECONDS);
    }

    private void handleImmutableCubeSegments(String cubeName, StreamingSegmentManager segmentManager, Collection<StreamingCubeSegment> segments, RetentionPolicyInfo retentionPolicyInfo) throws Exception {
        if ("fullBuild".equalsIgnoreCase(retentionPolicyInfo.getName())) {
            if (this.isLeader) {
                this.sendSegmentsToFullBuild(cubeName, segmentManager, segments);
            }
        } else {
            this.purgeSegments(cubeName, segments, retentionPolicyInfo.getProperties());
        }
    }

    private void sendSegmentsToFullBuild(String cubeName, StreamingSegmentManager segmentManager, Collection<StreamingCubeSegment> segments) throws Exception {
        ArrayList futureList = Lists.newArrayList();
        for (StreamingCubeSegment segment : segments) {
            String segmentHDFSPath = HDFSUtil.getStreamingSegmentFilePath((String)cubeName, (String)segment.getSegmentName()) + "/" + this.replicaSetID;
            SegmentHDFSFlusher flusher = new SegmentHDFSFlusher(segment, segmentHDFSPath);
            futureList.add(this.segmentFlushExecutor.submit(flusher));
        }
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        CubeInstance cubeInstance = CubeManager.getInstance((KylinConfig)kylinConfig).getCube(cubeName);
        IStreamingSource streamingSource = StreamingSourceFactory.getStreamingSource((ISourceAware)cubeInstance);
        int i = 0;
        for (StreamingCubeSegment segment : segments) {
            ((Future)futureList.get(i)).get();
            logger.info("Save remote store state to metadata store.");
            this.streamMetadataStore.addCompleteReplicaSetForSegmentBuild(segment.getCubeName(), segment.getSegmentName(), this.replicaSetID);
            logger.info("save remote checkpoint to metadata store");
            ISourcePosition smallestSourcePosition = segmentManager.getSmallestSourcePosition(segment);
            String smallestSourcePosStr = streamingSource.getSourcePositionHandler().serializePosition(smallestSourcePosition);
            this.streamMetadataStore.saveSourceCheckpoint(segment.getCubeName(), segment.getSegmentName(), this.replicaSetID, smallestSourcePosStr);
            logger.info("Send notification to coordinator for cube {} segment {}.", (Object)cubeName, (Object)segment.getSegmentName());
            this.coordinatorClient.segmentRemoteStoreComplete(this.currentNode, segment.getCubeName(), new Pair((Object)segment.getDateRangeStart(), (Object)segment.getDateRangeEnd()));
            logger.info("Send notification success.");
            segment.saveState(StreamingCubeSegment.State.REMOTE_PERSISTED);
            logger.info("Commit cube {} segment {}  status converted to {}.", new Object[]{segment.getCubeName(), segment.getSegmentName(), StreamingCubeSegment.State.REMOTE_PERSISTED.name()});
            ++i;
        }
    }

    private void purgeSegments(String cubeName, Collection<StreamingCubeSegment> segments, Map<String, String> properties) {
        long retentionTimeInSec = Long.valueOf(properties.get("retentionTimeInSec"));
        boolean hasPurgedSegment = false;
        for (StreamingCubeSegment segment : segments) {
            long liveTime = System.currentTimeMillis() - segment.getCreateTime();
            if (retentionTimeInSec * 1000L >= liveTime) continue;
            logger.info("purge segment:{}", (Object)segment);
            this.getStreamingSegmentManager(cubeName).purgeSegment(segment.getSegmentName());
            hasPurgedSegment = true;
        }
        if (hasPurgedSegment) {
            this.resumeConsumerIfPaused(cubeName);
        }
    }

    private void addShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                logger.info("start to shut down streaming receiver");
                for (Map.Entry consumerEntry : StreamingServer.this.cubeConsumerMap.entrySet()) {
                    logger.info("start to stop consumer for cube:{}", consumerEntry.getKey());
                    StreamingConsumerChannel consumer = (StreamingConsumerChannel)consumerEntry.getValue();
                    consumer.stop(10000L);
                    logger.info("finish to stop consumer for cube:{}", consumerEntry.getKey());
                }
                logger.info("streaming receiver shut down successfully");
            }
        });
    }

    private void startMetrics() {
        StreamingMetrics.getInstance().start();
    }

    private ReplicaSet findBelongReplicaSet() {
        List replicaSets = this.streamMetadataStore.getReplicaSets();
        for (ReplicaSet rs : replicaSets) {
            if (!rs.containPhysicalNode(this.currentNode)) continue;
            return rs;
        }
        return null;
    }

    private void registerReceiver() throws Exception {
        logger.info("register receiver: {}", (Object)this.currentNode);
        this.streamMetadataStore.addReceiver(this.currentNode);
    }

    private void joinReplicaSetLeaderElection(int replicaSetID) {
        this.leaderSelector = new ReplicaSetLeaderSelector(this.streamZKClient, this.currentNode, replicaSetID);
        this.leaderSelector.addLeaderChangeListener(this);
        this.leaderSelector.start();
    }

    public synchronized void assign(Map<String, List<Partition>> cubeAssignment) {
        this.assignments.putAll(cubeAssignment);
    }

    public synchronized void assign(String cubeName, List<Partition> partitions) {
        this.assignments.put(cubeName, partitions);
    }

    public synchronized void unAssign(String cubeName) {
        this.stopConsumer(cubeName);
        this.assignments.remove(cubeName);
        this.removeCubeData(cubeName);
    }

    public synchronized void startConsumers(List<String> cubes) {
        for (String cube : cubes) {
            this.startConsumer(cube, null);
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public synchronized void startConsumer(String cubeName, ConsumerStartProtocol startProtocol) {
        List<Partition> partitions = this.assignments.get(cubeName);
        StreamingConsumerChannel consumer = this.cubeConsumerMap.get(cubeName);
        if (consumer != null) {
            List consumingPartitions = consumer.getConsumePartitions();
            Collections.sort(partitions);
            Collections.sort(consumingPartitions);
            if (partitions.equals(consumingPartitions)) {
                logger.info("The consumer for cube:{} is already running, skip starting", (Object)cubeName);
                return;
            }
            String msg = String.format(Locale.ROOT, "The running consumer for cube:%s partition:%s is conflict with assign partitions:%s, should stop the consumer first.", cubeName, consumingPartitions, partitions);
            throw new IllegalStateException(msg);
        }
        if (partitions == null || partitions.isEmpty()) {
            logger.info("partitions is empty for cube:{}", (Object)cubeName);
            return;
        }
        logger.info("create and start new consumer for cube:{}", (Object)cubeName);
        try {
            this.reloadCubeMetadata(cubeName);
            StreamingConsumerChannel newConsumer = this.createNewConsumer(cubeName, partitions, startProtocol);
            newConsumer.start();
            return;
        }
        catch (Exception e) {
            logger.error("consumer start fail for cube:" + cubeName, (Throwable)e);
        }
    }

    public synchronized ConsumerStatsResponse stopConsumer(String cube) {
        logger.info("stop consumers for cube: {}", (Object)cube);
        ConsumerStatsResponse response = new ConsumerStatsResponse();
        StreamingConsumerChannel consumer = this.cubeConsumerMap.get(cube);
        if (consumer != null) {
            consumer.stop(10000L);
            this.cubeConsumerMap.remove(cube);
            response.setCubeName(cube);
            response.setConsumePosition(consumer.getSourceConsumeInfo());
        }
        return response;
    }

    public synchronized void stopAllConsumers() {
        ArrayList cubes = Lists.newArrayList(this.cubeConsumerMap.keySet());
        for (String cube : cubes) {
            this.stopConsumer(cube);
        }
    }

    public synchronized ConsumerStatsResponse pauseConsumer(String cubeName) {
        logger.info("pause consumers for cube: {}", (Object)cubeName);
        ConsumerStatsResponse response = new ConsumerStatsResponse();
        response.setCubeName(cubeName);
        StreamingConsumerChannel consumer = this.cubeConsumerMap.get(cubeName);
        if (consumer != null) {
            consumer.pause(true);
            response.setConsumePosition(consumer.getSourceConsumeInfo());
        } else {
            logger.warn("the consumer for cube:{} does not exist ", (Object)cubeName);
        }
        return response;
    }

    public synchronized ConsumerStatsResponse resumeConsumer(String cubeName, String resumeToPosition) {
        logger.info("resume consumers for cube: {}", (Object)cubeName);
        ConsumerStatsResponse response = new ConsumerStatsResponse();
        response.setCubeName(cubeName);
        StreamingConsumerChannel consumer = this.cubeConsumerMap.get(cubeName);
        if (consumer == null) {
            logger.warn("the consumer for cube:{} does not exist", (Object)cubeName);
            return response;
        }
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        CubeInstance cube = CubeManager.getInstance((KylinConfig)kylinConfig).getCube(cubeName);
        IStreamingSource streamingSource = StreamingSourceFactory.getStreamingSource((ISourceAware)cube);
        if (resumeToPosition != null && !resumeToPosition.isEmpty()) {
            EndPositionStopCondition stopCondition = new EndPositionStopCondition(streamingSource.getSourcePositionHandler().parsePosition(resumeToPosition));
            consumer.resumeToStopCondition((IStopConsumptionCondition)stopCondition);
            this.cubeConsumerMap.remove(cubeName);
        } else {
            consumer.resume();
        }
        response.setConsumePosition(consumer.getSourceConsumeInfo());
        return response;
    }

    public void addToReplicaSet(int replicaSetID) {
        logger.info("add the node to the replicaSet:{}, join the group leader election.", (Object)replicaSetID);
        if (this.replicaSetID == replicaSetID) {
            logger.info("the receiver already in the replica set:{}, return", (Object)replicaSetID);
            return;
        }
        if (this.replicaSetID != -1) {
            throw new IllegalStateException("the receiver is in replica set:" + this.replicaSetID + ", please remove first");
        }
        this.replicaSetID = replicaSetID;
        this.joinReplicaSetLeaderElection(replicaSetID);
        Map nodeAssignments = this.streamMetadataStore.getAssignmentsByReplicaSet(replicaSetID);
        if (nodeAssignments != null) {
            this.assign(nodeAssignments);
            ArrayList assignedCubes = Lists.newArrayList(nodeAssignments.keySet());
            this.initLocalSegmentManager(assignedCubes);
            this.startConsumers(assignedCubes);
        } else {
            this.initLocalSegmentManager(Lists.newArrayList());
        }
    }

    public void removeFromReplicaSet() {
        if (this.leaderSelector != null) {
            try {
                this.leaderSelector.close();
            }
            catch (Exception e) {
                logger.error("error happens when close leader selector", (Throwable)e);
            }
        }
        this.replicaSetID = -1;
        this.isLeader = false;
        this.assignments.clear();
        this.stopAllConsumers();
        ArrayList cubes = Lists.newArrayList(this.streamingSegmentManagerMap.keySet());
        for (String cube : cubes) {
            this.removeCubeData(cube);
        }
    }

    public ReceiverStats getReceiverStats() {
        ReceiverStats stats = new ReceiverStats();
        stats.setAssignments(this.assignments);
        stats.setLead(this.isLeader);
        HashSet allCubes = Sets.newHashSet();
        allCubes.addAll(this.assignments.keySet());
        allCubes.addAll(this.cubeConsumerMap.keySet());
        allCubes.addAll(this.streamingSegmentManagerMap.keySet());
        for (String cube : allCubes) {
            stats.addCubeStats(cube, this.getCubeStats(cube));
        }
        stats.setCacheStats(ColumnarStoreCache.getInstance().getCacheStats());
        return stats;
    }

    public ReceiverCubeStats getCubeStats(String cubeName) {
        StreamingSegmentManager segmentManager;
        ReceiverCubeStats receiverCubeStats = new ReceiverCubeStats();
        StreamingConsumerChannel consumer = this.cubeConsumerMap.get(cubeName);
        if (consumer != null) {
            receiverCubeStats.setConsumerStats(consumer.getConsumerStats());
        }
        if ((segmentManager = this.streamingSegmentManagerMap.get(cubeName)) != null) {
            Map segmentStatsMap = segmentManager.getSegmentStats();
            receiverCubeStats.setSegmentStatsMap(segmentStatsMap);
            receiverCubeStats.setTotalIngest(segmentManager.getIngestCount());
            receiverCubeStats.setLatestEventTime(StreamingSegmentManager.resetTimestampByTimeZone((long)segmentManager.getLatestEventTime()));
            receiverCubeStats.setLatestEventIngestTime(StreamingSegmentManager.resetTimestampByTimeZone((long)segmentManager.getLatestEventIngestTime()));
            receiverCubeStats.setLongLatencyInfo(segmentManager.getLongLatencyInfo());
        }
        return receiverCubeStats;
    }

    public void makeCubeImmutable(String cubeName) {
        StreamingSegmentManager segmentManager;
        if (this.cubeConsumerMap.containsKey(cubeName)) {
            logger.info("before make cube immutable, stop consumer for cube:{}", (Object)cubeName);
            StreamingConsumerChannel consumer = this.cubeConsumerMap.get(cubeName);
            consumer.stop(10000L);
            this.cubeConsumerMap.remove(cubeName);
        }
        if ((segmentManager = this.streamingSegmentManagerMap.get(cubeName)) == null) {
            return;
        }
        segmentManager.makeAllSegmentsImmutable();
    }

    public void makeCubeSegmentImmutable(String cubeName, String segmentName) {
        StreamingSegmentManager cubeStore = this.streamingSegmentManagerMap.get(cubeName);
        if (cubeStore == null) {
            return;
        }
        cubeStore.makeSegmentImmutable(segmentName);
    }

    public void remoteSegmentBuildComplete(String cubeName, String segmentName) {
        StreamingSegmentManager segmentManager = this.getStreamingSegmentManager(cubeName);
        List removedSegments = segmentManager.remoteSegmentBuildComplete(segmentName);
        if (!removedSegments.isEmpty()) {
            this.resumeConsumerIfPaused(cubeName);
        }
    }

    private void resumeConsumerIfPaused(String cubeName) {
        StreamingConsumerChannel consumer = this.getConsumer(cubeName);
        if (consumer == null || !consumer.isPaused()) {
            return;
        }
        StreamingCubeConsumeState consumeState = this.streamMetadataStore.getStreamingCubeConsumeState(cubeName);
        if (consumeState == null || consumeState == StreamingCubeConsumeState.RUNNING) {
            logger.info("resume the cube consumer:{} after remove some local immutable segments", (Object)cubeName);
            consumer.resume();
        }
    }

    private StreamingConsumerChannel createNewConsumer(String cubeName, List<Partition> partitions, ConsumerStartProtocol startProtocol) throws IOException {
        StreamingCubeConsumeState consumeState;
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        CubeInstance cube = CubeManager.getInstance((KylinConfig)kylinConfig).getCube(cubeName);
        StreamingSegmentManager segmentManager = this.getStreamingSegmentManager(cubeName);
        IStreamingSource streamingSource = StreamingSourceFactory.getStreamingSource((ISourceAware)cube);
        IStreamingConnector streamingConnector = streamingSource.createStreamingConnector(cubeName, partitions, startProtocol, segmentManager);
        StreamingConsumerChannel consumer = new StreamingConsumerChannel(cubeName, streamingConnector, segmentManager, IStopConsumptionCondition.NEVER_STOP);
        long minAcceptEventTime = cube.getDescriptor().getPartitionDateStart();
        CubeSegment latestRemoteSegment = cube.getLatestReadySegment();
        if (latestRemoteSegment != null) {
            minAcceptEventTime = (Long)latestRemoteSegment.getTSRange().end.v;
        }
        if (minAcceptEventTime > 0L && minAcceptEventTime < System.currentTimeMillis()) {
            consumer.setMinAcceptEventTime(minAcceptEventTime);
        }
        if ((consumeState = this.streamMetadataStore.getStreamingCubeConsumeState(cubeName)) != null && consumeState == StreamingCubeConsumeState.PAUSED) {
            consumer.pause(false);
        }
        this.cubeConsumerMap.put(cubeName, consumer);
        return consumer;
    }

    @Override
    public void becomeLeader() {
        if (this.replicaSetID != -1) {
            logger.info("become leader of the replicaSet:{}", (Object)this.replicaSetID);
            try {
                ReplicaSet rs = this.streamMetadataStore.getReplicaSet(this.replicaSetID);
                rs.setLeader(this.currentNode);
                this.streamMetadataStore.updateReplicaSet(rs);
                this.coordinatorClient.replicaSetLeaderChange(this.replicaSetID, this.currentNode);
            }
            catch (Exception e) {
                logger.error("error when send lead change notification to coordinator", (Throwable)e);
            }
        }
        this.isLeader = true;
    }

    @Override
    public void becomeFollower() {
        this.isLeader = false;
        if (this.replicaSetID != -1) {
            logger.info("become follower of the replicaSet:{}", (Object)this.replicaSetID);
        }
    }

    public StreamingConsumerChannel getConsumer(String cubeName) {
        return this.cubeConsumerMap.get(cubeName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public StreamingSegmentManager getStreamingSegmentManager(String cubeName) {
        if (this.streamingSegmentManagerMap.get(cubeName) == null) {
            Map<String, StreamingSegmentManager> map = this.streamingSegmentManagerMap;
            synchronized (map) {
                if (this.streamingSegmentManagerMap.get(cubeName) == null) {
                    CubeInstance cubeInstance = CubeManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv()).getCube(cubeName);
                    ISourcePositionHandler sourcePositionHandler = StreamingSourceFactory.getStreamingSource((ISourceAware)cubeInstance).getSourcePositionHandler();
                    StreamingSegmentManager segmentManager = new StreamingSegmentManager(this.baseStorePath, cubeInstance, sourcePositionHandler, (IConsumerProvider)this);
                    this.streamingSegmentManagerMap.put(cubeName, segmentManager);
                }
            }
        }
        return this.streamingSegmentManagerMap.get(cubeName);
    }

    public void removeCubeData(String cubeName) {
        logger.info("remove cube data: {}", (Object)cubeName);
        StreamingSegmentManager segmentManager = this.getStreamingSegmentManager(cubeName);
        if (segmentManager != null) {
            this.streamingSegmentManagerMap.remove(cubeName);
            segmentManager.close();
            segmentManager.purgeAllSegments();
        }
    }

    public void reSubmitCubeSegment(String cubeName, String segmentName) {
        StreamingSegmentManager segmentManager = this.getStreamingSegmentManager(cubeName);
        StreamingCubeSegment segment = segmentManager.getSegmentByName(segmentName);
        if (segment == null) {
            throw new IllegalStateException("cannot find segment:" + segmentName);
        }
        if (segment.isActive()) {
            throw new IllegalStateException("the segment must be immutable:" + segment);
        }
        String segmentHDFSPath = HDFSUtil.getStreamingSegmentFilePath((String)cubeName, (String)segmentName) + "/" + this.replicaSetID;
        SegmentHDFSFlusher flusher = new SegmentHDFSFlusher(segment, segmentHDFSPath);
        try {
            flusher.flushToHDFS();
        }
        catch (IOException e) {
            throw new RuntimeException("fail to copy segment to hdfs:" + segment, e);
        }
    }

    public Collection<StreamingSegmentManager> getAllCubeSegmentManagers() {
        return this.streamingSegmentManagerMap.values();
    }

    private void initLocalSegmentManager(List<String> assignedCubes) {
        File[] subFolders;
        File baseFolder = new File(this.baseStorePath);
        if (!baseFolder.exists()) {
            baseFolder.mkdirs();
        }
        for (File cubeFolder : subFolders = baseFolder.listFiles(new FileFilter(){

            @Override
            public boolean accept(File file) {
                return file.isDirectory();
            }
        })) {
            String cubeName = cubeFolder.getName();
            if (!assignedCubes.contains(cubeName)) {
                logger.info("remove the cube:{} data, because it is not assigned to this node", (Object)cubeName);
                try {
                    FileUtils.deleteDirectory((File)cubeFolder);
                }
                catch (IOException e) {
                    logger.error("error happens when remove cube folder", (Throwable)e);
                }
                continue;
            }
            try {
                StreamingSegmentManager segmentManager = this.getStreamingSegmentManager(cubeName);
                segmentManager.restoreSegmentsFromLocal();
            }
            catch (Exception e) {
                logger.error("local cube store init fail", (Throwable)e);
            }
        }
    }

    private void reloadCubeMetadata(String cubeName) throws IOException {
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        ResourceStore resourceStore = ResourceStore.getStore((KylinConfig)kylinConfig);
        CubeInstance rawCubeInstance = (CubeInstance)resourceStore.getResource(CubeInstance.concatResourcePath((String)cubeName), CubeManager.CUBE_SERIALIZER);
        CubeDesc rawCubeDesc = (CubeDesc)resourceStore.getResource(CubeDesc.concatResourcePath((String)rawCubeInstance.getDescName()), CubeDescManager.CUBE_DESC_SERIALIZER);
        DataModelDesc rawModel = (DataModelDesc)resourceStore.getResource(DataModelDesc.concatResourcePath((String)rawCubeDesc.getModelName()), (Serializer)new JsonSerializer(DataModelDesc.class));
        ProjectManager projectManager = ProjectManager.getInstance((KylinConfig)kylinConfig);
        List projects = projectManager.findProjectsByModel(rawModel.getName());
        if (projects.isEmpty()) {
            projectManager.reloadAll();
            projects = projectManager.findProjectsByModel(rawModel.getName());
        }
        if (projects.size() != 1) {
            throw new IllegalArgumentException("the cube:" + cubeName + " is not in any project");
        }
        TableMetadataManager.getInstance((KylinConfig)kylinConfig).reloadSourceTableQuietly(rawModel.getRootFactTableName(), ((ProjectInstance)projects.get(0)).getName());
        DataModelManager.getInstance((KylinConfig)kylinConfig).reloadDataModel(rawModel.getName());
        CubeDescManager.getInstance((KylinConfig)kylinConfig).reloadCubeDescLocal(cubeName);
        CubeInstance cubeInstance = CubeManager.getInstance((KylinConfig)kylinConfig).reloadCubeQuietly(cubeName);
        StreamingSourceConfigManager.getInstance((KylinConfig)kylinConfig).reloadStreamingConfigLocal(cubeInstance.getRootFactTable(), cubeInstance.getProject());
    }

    private String calLocalSegmentCacheDir() {
        String localSegmentCachePath;
        String kylinHome = KylinConfig.getKylinHome();
        String indexPathStr = KylinConfig.getInstanceFromEnv().getStreamingIndexPath();
        File indexPath = new File(indexPathStr);
        if (indexPath.isAbsolute()) {
            localSegmentCachePath = indexPathStr;
        } else if (kylinHome != null && !kylinHome.equals("")) {
            File localSegmentFile = new File(kylinHome, indexPathStr);
            localSegmentCachePath = localSegmentFile.getAbsolutePath();
        } else {
            localSegmentCachePath = indexPathStr;
        }
        logger.info("Using {} to store local segment cache.", (Object)localSegmentCachePath);
        return localSegmentCachePath;
    }

    private static class SegmentHDFSFlusher
    implements Runnable {
        private final Logger logger = LoggerFactory.getLogger(SegmentHDFSFlusher.class);
        private StreamingCubeSegment segment;
        private String hdfsPath;

        public SegmentHDFSFlusher(StreamingCubeSegment segment, String hdfsPath) {
            this.segment = segment;
            this.hdfsPath = hdfsPath;
        }

        public void flushToHDFS() throws IOException {
            FileStatus sdst;
            this.logger.info("start to flush cube:{} segment:{} to hdfs:{}", new Object[]{this.segment.getCubeName(), this.segment.getSegmentName(), this.hdfsPath});
            FileSystem fs = HadoopUtil.getFileSystem((String)this.hdfsPath);
            String localPath = this.segment.getDataSegmentFolder().getPath();
            Path remotePath = new Path(this.hdfsPath);
            if (fs.exists(remotePath)) {
                this.logger.info("the remote path:{} is already exist, skip copy data to remote", (Object)remotePath);
                return;
            }
            Path remoteTempPath = new Path(this.hdfsPath + ".tmp");
            if (fs.exists(remoteTempPath) && (sdst = fs.getFileStatus(remoteTempPath)).isDirectory()) {
                this.logger.warn("target temp path: {} is an existed directory, try to delete it.", (Object)remoteTempPath);
                fs.delete(remoteTempPath, true);
                this.logger.warn("target temp path: {} is deleted.", (Object)remoteTempPath);
            }
            fs.copyFromLocalFile(new Path(localPath), remoteTempPath);
            this.logger.info("data copy to remote temp path:{}", (Object)remoteTempPath);
            boolean renamed = fs.rename(remoteTempPath, remotePath);
            if (renamed) {
                this.logger.info("successfully rename the temp path to:{}", (Object)remotePath);
            }
        }

        @Override
        public void run() {
            try {
                this.flushToHDFS();
            }
            catch (Exception e) {
                this.logger.error("error when flush segment data to hdfs", (Throwable)e);
                throw new IllegalStateException(e);
            }
        }
    }
}

