package com.odianyun.horse.spark.realtime.dstream;

import com.google.gson.JsonObject;
import com.odianyun.horse.spark.common.SparkSessionBuilder$;
import kafka.serializer.StringDecoder;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.kafka.KafkaUtils$;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

/* compiled from: KafkaDStreamCreater.scala */
/* loaded from: input_file:com/odianyun/horse/spark/realtime/dstream/KafkaDStreamCreater$$anonfun$3.class */
public final class KafkaDStreamCreater$$anonfun$3 extends AbstractFunction1<String, StreamingContext> implements Serializable {
    public static final long serialVersionUID = 0;
    private final KafkaDStreamCreaterConfig kafkaDStreamCreaterConfig$1;
    private final Function1 calcFunc$1;

    public final StreamingContext apply(String str) {
        StreamingContext streamingContext = new StreamingContext(SparkSessionBuilder$.MODULE$.build(KafkaDStreamCreater$.MODULE$.getClass().getSimpleName()).sparkContext(), Seconds$.MODULE$.apply(this.kafkaDStreamCreaterConfig$1.interval_seconds()));
        streamingContext.checkpoint(str);
        Map<String, String> kafkaParams = this.kafkaDStreamCreaterConfig$1.kafkaParams();
        Set<String> set = this.kafkaDStreamCreaterConfig$1.topics();
        Predef$.MODULE$.println(new StringBuilder().append("topic: ").append(set).toString());
        Predef$.MODULE$.println(new StringBuilder().append("kafkaParams: ").append(kafkaParams).toString());
        try {
            InputDStream createDirectStream = KafkaUtils$.MODULE$.createDirectStream(streamingContext, kafkaParams, set, ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(StringDecoder.class), ClassTag$.MODULE$.apply(StringDecoder.class));
            createDirectStream.checkpoint(Seconds$.MODULE$.apply(this.kafkaDStreamCreaterConfig$1.interval_seconds() * 5));
            this.calcFunc$1.apply(createDirectStream.flatMap(new KafkaDStreamCreater$$anonfun$3$$anonfun$4(this), ClassTag$.MODULE$.apply(JsonObject.class)));
        } catch (Exception e) {
            Predef$.MODULE$.println(e.getMessage());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        return streamingContext;
    }

    public KafkaDStreamCreater$$anonfun$3(KafkaDStreamCreaterConfig kafkaDStreamCreaterConfig, Function1 function1) {
        this.kafkaDStreamCreaterConfig$1 = kafkaDStreamCreaterConfig;
        this.calcFunc$1 = function1;
    }
}
