/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.sink.kite.policy;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.sink.kite.policy.FailurePolicy;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.Formats;
import org.kitesdk.data.Syncable;
import org.kitesdk.data.View;

public class SavePolicy
implements FailurePolicy {
    private final View<AvroFlumeEvent> dataset;
    private DatasetWriter<AvroFlumeEvent> writer;
    private int nEventsHandled;

    private SavePolicy(Context context) {
        String uri = context.getString("kite.error.dataset.uri");
        Preconditions.checkArgument((uri != null ? 1 : 0) != 0, (Object)"Must set kite.error.dataset.uri when kite.failurePolicy=save");
        if (Datasets.exists((String)uri)) {
            this.dataset = Datasets.load((String)uri, AvroFlumeEvent.class);
        } else {
            DatasetDescriptor descriptor = new DatasetDescriptor.Builder().schema(AvroFlumeEvent.class).build();
            this.dataset = Datasets.create((String)uri, (DatasetDescriptor)descriptor, AvroFlumeEvent.class);
        }
        this.nEventsHandled = 0;
    }

    @Override
    public void handle(Event event, Throwable cause) throws EventDeliveryException {
        try {
            if (this.writer == null) {
                this.writer = this.dataset.newWriter();
            }
            AvroFlumeEvent avroEvent = new AvroFlumeEvent();
            avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
            avroEvent.setHeaders(SavePolicy.toCharSeqMap(event.getHeaders()));
            this.writer.write((Object)avroEvent);
            ++this.nEventsHandled;
        }
        catch (RuntimeException ex) {
            throw new EventDeliveryException((Throwable)ex);
        }
    }

    @Override
    public void sync() throws EventDeliveryException {
        if (this.nEventsHandled > 0) {
            if (Formats.PARQUET.equals((Object)this.dataset.getDataset().getDescriptor().getFormat())) {
                this.close();
            } else if (this.writer instanceof Syncable) {
                ((Syncable)this.writer).sync();
            }
        }
    }

    @Override
    public void close() throws EventDeliveryException {
        if (this.nEventsHandled > 0) {
            try {
                this.writer.close();
            }
            catch (RuntimeException ex) {
                throw new EventDeliveryException((Throwable)ex);
            }
            finally {
                this.writer = null;
                this.nEventsHandled = 0;
            }
        }
    }

    private static Map<CharSequence, CharSequence> toCharSeqMap(Map<String, String> map) {
        return Maps.newHashMap(map);
    }

    public static class Builder
    implements FailurePolicy.Builder {
        @Override
        public FailurePolicy build(Context config) {
            return new SavePolicy(config);
        }
    }
}

