Publishing Custom Avro Events via Kafka
Purpose¶
This application demonstrates how to configure WSO2 Integrator: SI Tooling to send sweet production events via Kafka transport in Avro format with custom mapping.
Prerequisites¶
-
Set up Kafka as follows:
- Create a folder called
kafkaand another folder calledkafka-osgi. - Copy the following files from
<Kafka-Home>/libsto thekafkafolder you just created:- kafka_2.11-0.10.0.0.jar
- kafka-clients-0.10.0.0.jar
- metrics-core-2.2.0.jar
- scala-library-2.11.8.jar
- zkclient-0.8.jar
- zookeeper-3.4.6.jar
- Copy these same files to the
<SI-Tooling-Home>/samples/sample-clients/libfolder. -
Navigate to
<SI-Tooling-Home>/binand issue the following command:- For Linux:
./jartobundle.sh <path/kafka> <path/kafka-osgi> - For Windows:
./jartobundle.bat <path/kafka> <path/kafka-osgi>
If converted successfully, the following messages are shown on the terminal for each lib file:
INFO: Created the OSGi bundle <kafka-lib-name>.jar for JAR file <absolute_path>/kafka/<kafka-lib-name>.jar - For Linux:
-
Copy the OSGi-converted Kafka libraries from the
kafka-osgifolder to<SI-Tooling-Home>/lib.
- Create a folder called
-
Save this sample.
- If there are no syntax errors, the following message is shown on the console:
Siddhi App PublishKafkaInAVroFormat successfully deployed.
Executing the Sample¶
- Navigate to
<Kafka-Home>and start the Zookeeper node usingbin/zookeeper-server-start.sh config/zookeeper.properties - Navigate to
<Kafka-Home>and start the Kafka server node usingbin/kafka-server-start config/server.properties. - Navigate to
<ConfluentHome>and start the Schema registry node using,bin/schema-registry-start ./etc/schema-registry/schema-registry.properties -
Post the Avro schema to the schema registry using,
curl -X POST -H "Content-Type: application/json" \ --data '{ "schema": "{ \"type\": \"record\", \"name\": \"sweetProduction\", \"namespace\": \"sweetProduction\", \"fields\": [ {\"name\": \"Name\", \"type\": \"string\" }, {\"name\": \"Amount\", \"type\": \"double\"} ] }" }' http://localhost:8081/subjects/sweet-production/versions -
Navigate to
<SI-Tooling-Home>/samples/sample-clients/kafka-avro-consumerand run the commandant -Dtype=avro -DisBinaryMessage=true - Start the Siddhi application by clicking on 'Run'.
-
If the Siddhi application starts successfully, the following messages are shown on the console:
- PublishKafkaInCustomAvroFormat.siddhi - Started Successfully! - Kafka version : 0.10.0.0 - Kafka commitId : 23c69d62a0cabf06 - Kafka producer created.
Testing the Sample¶
Send events through one or more of the following methods.
Option 1 - Send events to the Kafka sink via the event simulator¶
- Open the event simulator by clicking the second icon or pressing Ctrl+Shift+I.
- In the Single Simulation tab of the panel, specify the values as follows:
Siddhi App Name:PublishKafkaInCustomAvroFormatStream Name:SweetProductionStream
- In the
nameandamountfields, enter the following values and then clickSendto send the event.name:chocolate cakeamount:50.50
- Send some more events.
Option 2 - Publish events with Curl to the simulator HTTP endpoint¶
-
Open a new terminal and issue the following command:
curl -X POST -d '{"streamName": "SweetProductionStream", "siddhiAppName": "PublishKafkaInCustomAvroFormat","data": ["chocolate cake", 50.50]}' http://localhost:9390/simulation/single -H 'content-type: text/plain' -
If there is no error, the following messages are shown on the terminal:
{"status":"OK","message":"Single Event simulation started successfully"}
Option 3 - Publish events with Postman to the simulator HTTP endpoint¶
- Launch the Postman application.
-
Make a 'Post' request to the
http://localhost:9390/simulation/singleendpoint. Set theContent-Typetotext/plainand set the request body in text as follows:{ "streamName":"SweetProductionStream", "siddhiAppName":"PublishKafkaInCustomAvroFormat", "data":[ "chocolate cake", 50.50 ] } -
Click
Send. If there is no error, the following messages are shown on the console:- "status": "OK",
- "message": "Single Event simulation started successfully"
Viewing the Results¶
See the output on the terminal of <WSO2SIHome>/samples/sample-clients/kafka-avro-consumer:
[java] [org.wso2.extension.siddhi.io.kafka.source.KafkaConsumerThread] : Event received in Kafka Event Adaptor with offSet: 0, key: null, topic: kafka_result_topic, partition: 0
[java] [io.siddhi.core.stream.output.sink.LogSink] : KafkaSample : logStream : Event{timestamp=1546973319457, data=[chocolate cake, 50.5], isExpired=false}
Notes¶
If the message 'Kafka' sink at 'LowProductionAlertStream' has successfully connected to http://localhost:9092 does not appear, it could be that port 9092 defined in the Siddhi application is already being used by a different program. To resolve this issue, do the following,
- Stop this Siddhi application (Click 'Run' on menu bar -> 'Stop').
- In this Siddhi application's source configuration, change port 9092 to an unused port.
- Start the application and check whether the specified messages appear on the console.
@App:name("PublishKafkaInCustomAvroFormat")
@App:description('Send events via Kafka transport using Custom Avro format')
define stream SweetProductionStream (sweetName string, sweetAmount double);
@sink(type='kafka',
topic='kafka_result_topic',
is.binary.message='true',
bootstrap.servers='localhost:9092',
@map(type='avro',schema.def="""{"type":"record","name":"stock","namespace":"stock","fields":[{"name":"name","type":"string"},{"name":"amount","type":"double"}]}""",
@payload("""{"name": "{{ sweetName }}", "amount": {{ sweetAmount }}}""")))
define stream LowProductionAlertStream (sweetName string, sweetAmount double);
@info(name='EventsPassthroughQuery')
from SweetProductionStream
select *
insert into LowProductionAlertStream;