|
16 | 16 | import org.apache.spark.SparkConf; |
17 | 17 | import org.apache.spark.api.java.JavaRDD; |
18 | 18 | import org.apache.spark.api.java.JavaSparkContext; |
19 | | -import org.apache.spark.api.java.Optional; |
20 | | -import org.apache.spark.api.java.function.FlatMapFunction; |
21 | | -import org.apache.spark.api.java.function.Function; |
22 | 19 | import org.apache.spark.api.java.function.Function2; |
23 | | -import org.apache.spark.api.java.function.Function3; |
24 | | -import org.apache.spark.api.java.function.PairFunction; |
25 | | -import org.apache.spark.api.java.function.VoidFunction; |
26 | 20 | import org.apache.spark.streaming.Durations; |
27 | | -import org.apache.spark.streaming.State; |
28 | 21 | import org.apache.spark.streaming.StateSpec; |
29 | 22 | import org.apache.spark.streaming.api.java.JavaDStream; |
30 | 23 | import org.apache.spark.streaming.api.java.JavaInputDStream; |
@@ -71,24 +64,24 @@ public static void main(String[] args) throws InterruptedException { |
71 | 64 |
|
72 | 65 | JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams)); |
73 | 66 |
|
74 | | - JavaPairDStream<String, String> results = messages.mapToPair((PairFunction<ConsumerRecord<String, String>, String, String>) record -> new Tuple2<>(record.key(), record.value())); |
| 67 | + JavaPairDStream<String, String> results = messages.mapToPair(record -> new Tuple2<>(record.key(), record.value())); |
75 | 68 |
|
76 | | - JavaDStream<String> lines = results.map((Function<Tuple2<String, String>, String>) tuple2 -> tuple2._2()); |
| 69 | + JavaDStream<String> lines = results.map(tuple2 -> tuple2._2()); |
77 | 70 |
|
78 | | - JavaDStream<String> words = lines.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split("\\s+")) |
| 71 | + JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split("\\s+")) |
79 | 72 | .iterator()); |
80 | 73 |
|
81 | | - JavaPairDStream<String, Integer> wordCounts = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1)) |
| 74 | + JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) |
82 | 75 | .reduceByKey((Function2<Integer, Integer, Integer>) (i1, i2) -> i1 + i2); |
83 | 76 |
|
84 | | - JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function((Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>) (word, one, state) -> { |
| 77 | + JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function((word, one, state) -> { |
85 | 78 | int sum = one.orElse(0) + (state.exists() ? state.get() : 0); |
86 | 79 | Tuple2<String, Integer> output = new Tuple2<>(word, sum); |
87 | 80 | state.update(sum); |
88 | 81 | return output; |
89 | 82 | })); |
90 | 83 |
|
91 | | - cumulativeWordCounts.foreachRDD((VoidFunction<JavaRDD<Tuple2<String, Integer>>>) javaRdd -> { |
| 84 | + cumulativeWordCounts.foreachRDD(javaRdd -> { |
92 | 85 | List<Tuple2<String, Integer>> wordCountList = javaRdd.collect(); |
93 | 86 | for (Tuple2<String, Integer> tuple : wordCountList) { |
94 | 87 | List<Word> wordList = Arrays.asList(new Word(tuple._1, tuple._2)); |
|
0 commit comments