-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathKafka_avro_examples
147 lines (109 loc) · 6.84 KB
/
Kafka_avro_examples
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
java -jar avro-tools-1.8.1.jar compile schema ClickRecordV1.avsc .
It will generate NewClickRecord.java
specific.avro.reader true ==>Setting code generation here ...
https://github.com/aseigneurin/aseigneurin.github.io/blob/master/_posts/2016-03-04-kafka-spark-avro-producing-and-consuming-avro-messages.md
https://github.com/ScalaConsultants/spark-kafka-avro/blob/master/src/main/scala/io/scalac/spark/AvroConsumer.scala
https://github.com/mykidong/spark-kafka-simple-consumer-receiver
https://github.com/dmatrix/examples/tree/master/confluent
https://saumitra.me/blog/tweet-search-and-analysis-with-kafka-solr-cassandra/
https://saumitra.me/blog/
Usecase : https://medium.com/@stephane.maarek/how-to-use-apache-kafka-to-transform-a-batch-pipeline-into-a-real-time-one-831b48a6ad85
https://github.com/LearningJournal/ApacheKafkaTutorials/blob/master/AvroProducer-V1/AvroProducer.java
https://www.youtube.com/watch?v=1OdsRuKXWbM
https://www.youtube.com/watch?v=ig1TnNcULSg
LAUNCHING CP COMPONENTS:
1. Zookeeper : bin/zookeeper-server-start etc/kafka/zookeeper.properties &
2. Kafka Server : bin/kafka-server-start etc/kafka/server.properties &
3. Schema Registry : bin/schema-registry-start etc/schema-registry/schema-registry.properties &
4. InfluxDB : /usr/local/opt/influxdb/bin/influxd -config /usr/local/etc/influxdb.conf
CLI PRODUCER/CONSUMER (Blue Terminal D1)
1. Produce Messages:
./bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic test \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
2. Enter messages:
{"f1": "Hello World!"}
{"f1": "Happy Holidays!"}
{"f1": "Happy Hacking!"}
3. Consume Messages:
bin/kafka-avro-console-consumer --topic test --zookeeper localhost:2181 --from-beginning
SIMPLE JAVA PRODUCER (Blue Terminal D1)
1. cd examples/confluent/publisher
2. mvn clean package
3. mvn exec:java -Dexec.mainClass="com.dmatrix.iot.devices.SimplePublisher" -Dexec.args="devices 5 http://localhost:8081"
4. bin/kafka-avro-console-consumer --topic devices --zookeeper localhost:2181 --from-beginning
TO USE THE JAVA CONSUMER
1. cd examples/confluent/subscriber
2. mvn clean package
3. mvn exec:java -Dexec.mainClass="com.dmatrix.iot.devices.SubscribeIoTDevices" -Dexec.args="localhost:2181 group devices 1 http://localhost:8081"
TO USE THE JAVA SPARK STREAMING APP
1. cd examples/confluent/streaming/subscribers/java
2. mvn clean package
3. spark-submit --packages org.apache.spark:spark-streaming_2.10:1.6.0,org.apache.spark:spark-streaming-kafka_2.10:1.6.0,io.confluent:kafka-avro-serializer:1.0.1,org.apache.kafka:kafka_2.10:0.8.2.1 --repositories http://packages.confluent.io/maven/ --class com.dmatrix.iot.devices.DeviceIoTStreamApp --master local[6] target/iot-sub-spark-devices-2.0-SNAPSHOT.jar localhost:9092 devices
TO USE THE SCALA SPARK STREAMING APP
1. cd examples/confluent/streaming/subscribers/scala
2. sbt clean package
3. spark-submit --packages org.apache.spark:spark-streaming_2.10:1.6.0,org.apache.spark:spark-streaming-kafka_2.10:1.6.0,io.confluent:kafka-avro-serializer:1.0.1,org.apache.kafka:kafka_2.10:0.8.2.1,org.slf4j:slf4j-log4j12:1.6.1 --class main.scala.iot.DeviceIoTStreamApp --master local[6] target/scala-2.10/main-scala-iot_2.10-1.0.jar localhost:9092 devices humidity 60
##################################################################################################
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerAvroByte {
private final static String BOOTSTRAP_SERVERS="<KAFKA_BROKER_HOST>:9092";
private static final String KEYSTORE_FILE_LOCATION = new File("kafka.client.keystore.jks").getAbsolutePath();
private static final String KEYSTORE_FILE_PASSWORD = "dhgfgspasswd";
private static final String SSL_KEY_PASSWORD = "dfsfjjkjdk";
private static final String TRUSTSTORE_FILE_LOCATION = new File("kafka.client.truststore.jks").getAbsolutePath();
private static final String TRUSTSTORE_FILE_PASSWORD = "UVctH3qsIbdwTc0B";
public static void main(String[] args) throws Exception
{
Properties props = new Properties();
//Ref:https://github.com/janvanzoggel/apache-kafka/blob/master/src/main/java/nl/rubix/kafka/KafkaConsumerAvroByte.java
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG,"test-group_yh1vdjcschkj2");
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("security.protocol", "SSL");
props.put("ssl.keystore.location", KEYSTORE_FILE_LOCATION);
props.put("ssl.keystore.password", KEYSTORE_FILE_PASSWORD);
props.put("ssl.key.password", SSL_KEY_PASSWORD);
props.put("ssl.truststore.location", TRUSTSTORE_FILE_LOCATION);
props.put("ssl.truststore.password", TRUSTSTORE_FILE_PASSWORD);
props.put("serializer.class", "kafka.serializer.DefaultEncoder");
props.put("max.poll.records","100");
props.put("auto.offset.reset", "earliest");
final Consumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("TEST_Topic"));
while (true) {
final ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(1000);
// Decode
consumerRecords.forEach(record1 -> {
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
System.out.println("DECODE reader: " + reader);
ByteArrayInputStream is = new ByteArrayInputStream(record1.value());
System.out.println("DECODE is: " + is);
try {
DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(is, reader);
System.out.println("DECODE dataFileReader: " + dataFileReader);
GenericRecord record = null;
while (dataFileReader.hasNext()) {
record = dataFileReader.next(record);
System.out.println(record.getSchema());
System.out.println(record.toString());
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
} }