Skip to content

Commit 70a7aaa

Browse files
kcacademicpivovarit
authored andcommitted
Kafka spark cassandra (eugenp#6089)
* Adding files for the tutorial BAEL-2301 * Incorporating review comments on the article. * Incorporated additional review comments on the article.
1 parent 128817c commit 70a7aaa

File tree

2 files changed

+12
-25
lines changed

2 files changed

+12
-25
lines changed

apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingApp.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,7 @@
1414
import org.apache.log4j.Level;
1515
import org.apache.log4j.Logger;
1616
import org.apache.spark.SparkConf;
17-
import org.apache.spark.api.java.JavaPairRDD;
1817
import org.apache.spark.api.java.JavaRDD;
19-
import org.apache.spark.api.java.function.FlatMapFunction;
20-
import org.apache.spark.api.java.function.Function;
21-
import org.apache.spark.api.java.function.Function2;
22-
import org.apache.spark.api.java.function.PairFunction;
23-
import org.apache.spark.api.java.function.VoidFunction;
2418
import org.apache.spark.streaming.Durations;
2519
import org.apache.spark.streaming.api.java.JavaDStream;
2620
import org.apache.spark.streaming.api.java.JavaInputDStream;
@@ -59,17 +53,17 @@ public static void main(String[] args) throws InterruptedException {
5953

6054
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
6155

62-
JavaPairDStream<String, String> results = messages.mapToPair((PairFunction<ConsumerRecord<String, String>, String, String>) record -> new Tuple2<>(record.key(), record.value()));
56+
JavaPairDStream<String, String> results = messages.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
6357

64-
JavaDStream<String> lines = results.map((Function<Tuple2<String, String>, String>) tuple2 -> tuple2._2());
58+
JavaDStream<String> lines = results.map(tuple2 -> tuple2._2());
6559

66-
JavaDStream<String> words = lines.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split("\\s+"))
60+
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split("\\s+"))
6761
.iterator());
6862

69-
JavaPairDStream<String, Integer> wordCounts = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1))
70-
.reduceByKey((Function2<Integer, Integer, Integer>) (i1, i2) -> i1 + i2);
63+
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
64+
.reduceByKey((i1, i2) -> i1 + i2);
7165

72-
wordCounts.foreachRDD((VoidFunction<JavaPairRDD<String, Integer>>) javaRdd -> {
66+
wordCounts.foreachRDD(javaRdd -> {
7367
Map<String, Integer> wordCountMap = javaRdd.collectAsMap();
7468
for (String key : wordCountMap.keySet()) {
7569
List<Word> wordList = Arrays.asList(new Word(key, wordCountMap.get(key)));

apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,8 @@
1616
import org.apache.spark.SparkConf;
1717
import org.apache.spark.api.java.JavaRDD;
1818
import org.apache.spark.api.java.JavaSparkContext;
19-
import org.apache.spark.api.java.Optional;
20-
import org.apache.spark.api.java.function.FlatMapFunction;
21-
import org.apache.spark.api.java.function.Function;
2219
import org.apache.spark.api.java.function.Function2;
23-
import org.apache.spark.api.java.function.Function3;
24-
import org.apache.spark.api.java.function.PairFunction;
25-
import org.apache.spark.api.java.function.VoidFunction;
2620
import org.apache.spark.streaming.Durations;
27-
import org.apache.spark.streaming.State;
2821
import org.apache.spark.streaming.StateSpec;
2922
import org.apache.spark.streaming.api.java.JavaDStream;
3023
import org.apache.spark.streaming.api.java.JavaInputDStream;
@@ -71,24 +64,24 @@ public static void main(String[] args) throws InterruptedException {
7164

7265
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
7366

74-
JavaPairDStream<String, String> results = messages.mapToPair((PairFunction<ConsumerRecord<String, String>, String, String>) record -> new Tuple2<>(record.key(), record.value()));
67+
JavaPairDStream<String, String> results = messages.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
7568

76-
JavaDStream<String> lines = results.map((Function<Tuple2<String, String>, String>) tuple2 -> tuple2._2());
69+
JavaDStream<String> lines = results.map(tuple2 -> tuple2._2());
7770

78-
JavaDStream<String> words = lines.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split("\\s+"))
71+
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split("\\s+"))
7972
.iterator());
8073

81-
JavaPairDStream<String, Integer> wordCounts = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1))
74+
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
8275
.reduceByKey((Function2<Integer, Integer, Integer>) (i1, i2) -> i1 + i2);
8376

84-
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function((Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>) (word, one, state) -> {
77+
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function((word, one, state) -> {
8578
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
8679
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
8780
state.update(sum);
8881
return output;
8982
}));
9083

91-
cumulativeWordCounts.foreachRDD((VoidFunction<JavaRDD<Tuple2<String, Integer>>>) javaRdd -> {
84+
cumulativeWordCounts.foreachRDD(javaRdd -> {
9285
List<Tuple2<String, Integer>> wordCountList = javaRdd.collect();
9386
for (Tuple2<String, Integer> tuple : wordCountList) {
9487
List<Word> wordList = Arrays.asList(new Word(tuple._1, tuple._2));

0 commit comments

Comments
 (0)