package com.odianyun.horse.spark.realtime;

import com.google.gson.JsonObject;
import com.odianyun.horse.common.util.KafkaSender;
import com.odianyun.horse.spark.common.GsonUtil$;
import com.odianyun.horse.spark.model.JobContext;
import com.odianyun.horse.spark.realtime.dstream.DStreamManager$;
import com.odianyun.horse.spark.realtime.dstream.DStreamSource;
import com.odianyun.horse.spark.realtime.dstream.DStreamType$;
import com.odianyun.horse.spark.realtime.dstream.DStreamWrapper;
import org.apache.spark.streaming.dstream.DStream;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: AbstractRealTimeKafkaStreaming.scala */
@ScalaSignature(bytes = "\u0006\u0001q4Q!\u0001\u0002\u0002\u00025\u0011a$\u00112tiJ\f7\r\u001e*fC2$\u0016.\\3LC\u001a\\\u0017m\u0015;sK\u0006l\u0017N\\4\u000b\u0005\r!\u0011\u0001\u0003:fC2$\u0018.\\3\u000b\u0005\u00151\u0011!B:qCJ\\'BA\u0004\t\u0003\u0015AwN]:f\u0015\tI!\"\u0001\u0005pI&\fg._;o\u0015\u0005Y\u0011aA2p[\u000e\u0001Qc\u0001\b+uM\u0019\u0001aD\u000b\u0011\u0005A\u0019R\"A\t\u000b\u0003I\tQa]2bY\u0006L!\u0001F\t\u0003\r\u0005s\u0017PU3g!\t\u0001b#\u0003\u0002\u0018#\ta1+\u001a:jC2L'0\u00192mK\"A\u0011\u0004\u0001B\u0001B\u0003%!$\u0001\u0006k_\n\u001cuN\u001c;fqR\u0004\"a\u0007\u0010\u000e\u0003qQ!!\b\u0003\u0002\u000b5|G-\u001a7\n\u0005}a\"A\u0003&pE\u000e{g\u000e^3yi\"A\u0011\u0005\u0001B\u0002B\u0003-!%\u0001\u0006fm&$WM\\2fIE\u00022a\t\u0014)\u001b\u0005!#BA\u0013\u0012\u0003\u001d\u0011XM\u001a7fGRL!a\n\u0013\u0003\u0011\rc\u0017m]:UC\u001e\u0004\"!\u000b\u0016\r\u0001\u0011)1\u0006\u0001b\u0001Y\t\tA+\u0005\u0002.aA\u0011\u0001CL\u0005\u0003_E\u0011qAT8uQ&tw\r\u0005\u0002\u0011c%\u0011!'\u0005\u0002\u0004\u0003:L\b\"\u0002\u001b\u0001\t\u0003)\u0014A\u0002\u001fj]&$h\b\u0006\u00027{Q\u0011q\u0007\u0010\t\u0005q\u0001A\u0013(D\u0001\u0003!\tI#\bB\u0003<\u0001\t\u0007AFA\u0001V\u0011\u0015\t3\u0007q\u0001#\u0011\u0015I2\u00071\u0001\u001b\u0011\u0015y\u0004A\"\u0001A\u000399WM\\*pkJ\u001cW\rV8qS\u000e$\u0012!\u0011\t\u0003\u0005\u0016s!\u0001E\"\n\u0005\u0011\u000b\u0012A\u0002)sK\u0012,g-\u0003\u0002G\u000f\n11\u000b\u001e:j]\u001eT!\u0001R\t\t\u000b%\u0003a\u0011\u0001!\u0002\u0019\u001d,gnU5oWR{\u0007/[2\t\u000b-\u0003a\u0011\u0001'\u0002\u0013Q\u0014\u0018M\\:g_JlGCA'[!\rq\u0005,O\u0007\u0002\u001f*\u0011\u0001+U\u0001\bIN$(/Z1n\u0015\t\u00116+A\u0005tiJ,\u0017-\\5oO*\u0011Q\u0001\u0016\u0006\u0003+Z\u000ba!\u00199bG\",'\"A,\u0002\u0007=\u0014x-\u0003\u0002Z\u001f\n9Ai\u0015;sK\u0006l\u0007\"B.K\u0001\u0004a\u0016AB:ue\u0016\fW\u000eE\u0002O1v\u0003\"AX2\u000e\u0003}S!\u0001Y1\u0002\t\u001d\u001cxN\u001c\u0006\u0003E*\taaZ8pO2,\u0017B\u00013`\u0005)Q5o\u001c8PE*,7\r\u001e\u0005\u0006M\u0002!\taZ\u0001\u0006gR\f'\u000f\u001e\u000b\u0002QB\u0011\u0001#[\u0005\u0003UF\u0011A!\u00168ji\")A\u000e\u0001C\u0001O\u0006\u00012\u000f^1siJ+7m\u001c<fe\u0006\u0014G.\u001a\u0005\u0006]\u0002!\ta\\\u0001\nI>\u0004&o\\2fgN$2\u0001\u001b9s\u0011\u0015\tX\u000e1\u0001:\u0003\u0019\u0011XmY8sI\")1/\u001ca\u0001i\u0006Y1.\u00194lCN+g\u000eZ3s!\u0011)(0Q!\u000e\u0003YT!a\u001e=\u0002\tU$\u0018\u000e\u001c\u0006\u0003s\u001a\taaY8n[>t\u0017BA>w\u0005-Y\u0015MZ6b'\u0016tG-\u001a:")
/* loaded from: input_file:com/odianyun/horse/spark/realtime/AbstractRealTimeKafkaStreaming.class */
public abstract class AbstractRealTimeKafkaStreaming<T, U> implements Serializable {
    public final JobContext com$odianyun$horse$spark$realtime$AbstractRealTimeKafkaStreaming$$jobContext;

    public abstract String genSourceTopic();

    public abstract String genSinkTopic();

    public abstract DStream<U> transform(DStream<JsonObject> dStream);

    public void start() {
        Predef$.MODULE$.println("starting........  !!!");
        String genSourceTopic = genSourceTopic();
        Predef$.MODULE$.println(new StringBuilder().append("topic : ").append(genSourceTopic).toString());
        DStreamWrapper<JsonObject> dstream = DStreamManager$.MODULE$.getDstream(new DStreamSource(genSourceTopic, DStreamType$.MODULE$.Kafka(), this.com$odianyun$horse$spark$realtime$AbstractRealTimeKafkaStreaming$$jobContext.getProperties().getProperty("kafka.servers", "172.16.0.205:9092,172.16.0.206:9092,172.16.0.207:9092"), this.com$odianyun$horse$spark$realtime$AbstractRealTimeKafkaStreaming$$jobContext.getProperties().getProperty("check.point.dir", "")));
        Predef$.MODULE$.println("get stream ........  !!!");
        transform(dstream.getDStream()).foreachRDD(new AbstractRealTimeKafkaStreaming$$anonfun$start$1(this));
        dstream.start();
        Predef$.MODULE$.println("started.........  !!!");
    }

    public void startRecoverable() {
        Predef$.MODULE$.println("starting........  !!!");
        Function1<DStream<JsonObject>, BoxedUnit> abstractRealTimeKafkaStreaming$$anonfun$1 = new AbstractRealTimeKafkaStreaming$$anonfun$1(this);
        String genSourceTopic = genSourceTopic();
        Predef$.MODULE$.println(new StringBuilder().append("topic : ").append(genSourceTopic).toString());
        DStreamWrapper<JsonObject> dstream = DStreamManager$.MODULE$.getDstream(new DStreamSource(genSourceTopic, DStreamType$.MODULE$.Kafka(), this.com$odianyun$horse$spark$realtime$AbstractRealTimeKafkaStreaming$$jobContext.getProperties().getProperty("kafka.servers", "172.16.0.205:9092,172.16.0.206:9092,172.16.0.207:9092"), this.com$odianyun$horse$spark$realtime$AbstractRealTimeKafkaStreaming$$jobContext.getProperties().getProperty("check.point.dir", "")), abstractRealTimeKafkaStreaming$$anonfun$1);
        Predef$.MODULE$.println("get stream ........  !!!");
        dstream.start();
        Predef$.MODULE$.println("started.........  !!!");
    }

    public void doProcess(U u, KafkaSender<String, String> kafkaSender) {
        genSinkTopic();
        kafkaSender.sendMessage(genSinkTopic(), GsonUtil$.MODULE$.getInstance().toJson(u));
    }

    public AbstractRealTimeKafkaStreaming(JobContext jobContext, ClassTag<T> classTag) {
        this.com$odianyun$horse$spark$realtime$AbstractRealTimeKafkaStreaming$$jobContext = jobContext;
    }
}
