Receiving Binary Events via Kafka
Purpose¶
This application demonstrates how to configure WSO2 Integrator: SI Tooling to receive events to the SweetProductionStream via Kafka transport in Binary Format and log the events in LowProductionAlertStream to the output console.
Prerequisites¶
-
The following steps must be executed to enable SI to receive events via the Kafka transport. Since you need to shut down the server to execute these steps, get a copy of these instructions prior to proceeding.
- Download the Kafka broker from here.
-
Convert and copy the Kafka client jars from the
{KafkaHome}/libsdirectory to the<SI-Tooling-Home>/libsdirectory as follows.- Create a directory named
{Source}in a preferred location in your machine and copy the following JARs to it from the{KafkaHome}/libsdirectory.- kafka_2.11-0.10.0.0.jar
- kafka-clients-0.10.0.0.jar
- metrics-core-2.2.0.jar
- scala-library-2.11.8.jar
- zkclient-0.8.jar
- zookeeper-3.4.6.jar
- Create another directory named
{Destination}in a preferred location in your machine. -
To convert all the Kafka jars you copied into the
{Source}directory, issue the following command,-
For Windows:
<SI-Tooling-Home>/bin/jartobundle.bat <{Source} Directory Path> <{Destination} Directory Path> -
For Linux:
sh <SI-Tooling-Home>/bin/jartobundle.sh <{Source} Directory Path> <{Destination} Directory Path>
-
-
Add the OSGI converted kafka libs from
{Destination}directory to<SI-Tooling-Home>/lib. - Add the original Kafka libs from
{Source}to<SI-Tooling-Home>/samples/sample-clients/lib. -
Navigate to
{KafkaHome}and start zookeeper node using following command.sh bin/zookeeper-server-start.sh config/zookeeper.properties -
Navigate to
{KafkaHome}and start Kafka server node using following command.sh bin/kafka-server-start.sh config/server.properties
- Create a directory named
-
Save this sample.
Executing the sample¶
- Start the Siddhi application by clicking on 'Run'.
- If the Siddhi application starts successfully, the following messages would be shown on the console.
* ReceiveKafkaInBinaryFormat.siddhi - Started Successfully!
Notes¶
If you edit this application while it's running, stop the application -> Save -> Start.
Testing the sample¶
Navigate to <SI-Tooling-Home>/samples/sample-clients/kafka-producer and run "ant" command as follows:
ant -DnoOfEventsToSend=5 -DtopicName=kafka_sample_topic -DisBinaryMessage=true
Viewing the results¶
Messages similar to the following would be shown on the console.
INFO {io.siddhi.core.stream.output.sink.LogSink} - ReceiveKafkaInBinaryFormat : LowProductionAlertStream : Event{timestamp=1513282182570, data=["Cupcake", 1665.0], isExpired=false}
Note¶
- Stop this Siddhi application, once you are done with the execution.
- Stop Kafka server and Zookeeper server individually by executing Ctrl+C.
@App:name("ReceiveKafkaInBinaryFormat")
@App:description('Receive events via Kafka transport in Binary format and view the output on the console')
@source(type='kafka',
topic.list='kafka_sample_topic',
partition.no.list='0',
threading.option='single.thread',
group.id='group',
is.binary.message='true',
bootstrap.servers='localhost:9092',
@map(type='binary'))
define stream SweetProductionStream(id string, amount double);
@sink(type='log')
define stream LowProductionAlertStream(id string, amount double);
@info(name='query1')
from SweetProductionStream
select *
insert into LowProductionAlertStream;