Receiving Custom Text Events via Kafka
Purpose¶
This application demonstrates how to configure WSO2 Integrator: SI to receive events to the SweetProductionStream via Kafka transport in Text format using custom mapping and log the events in LowProductionAlertStream to the output console.
Prerequisites¶
- Setup Kafka.
- Kafka libs to be added and converted to OSGI from
{KafkaHome}/libsare as follows.- kafka_2.11-0.10.0.0.jar
- kafka-clients-0.10.0.0.jar
- metrics-core-2.2.0.jar
- scala-library-2.11.8.jar
- zkclient-0.8.jar
- zookeeper-3.4.6.jar
- Add the OSGI converted kafka libs to
<SI-Tooling-Home>/lib. - Add the kafka libs to
<SI-Tooling-Home>/samples/sample-clients/lib.
- Kafka libs to be added and converted to OSGI from
- Save this sample.
-
If there is no syntax error, the following message is shown on the console:
* Siddhi App PublishKafkaInJsonFormat successfully deployed.
Note¶
To convert Kafka libs to OSGI,
- Create a folder (eg: kafka) and copy Kafka libs to be added from
{KafkaHome}/libs. - Create another folder (eg: kafka-osgi, This folder will have the libs that converted to OSGI).
-
Navigate to
<SI-Tooling-Home>/binand issue the following command.- For Linux:
./jartobundle.sh <path/kafka> <path/kafka-osgi>- For Windows:
./jartobundle.bat <path/kafka> <path/kafka-osgi> -
If converted successfully then for each lib, following messages would be shown on the terminal.
- INFO: Created the OSGi bundle <kafka-lib-name>.jar for JAR file <absolute_path>/kafka/<kafka-lib-name>.jar -
You can find the osgi converted libs in kafka-osgi folder. You can copy that to
<SI-Tooling-Home>/lib.
Executing the sample¶
- Navigate to
{KafkaHome}and start zookeeper node using thebin/zookeeper-server-start.sh config/zookeeper.properties. - Navigate to
{KafkaHome}and start kafka server node usingbin/kafka-server-start.sh config/server.properties. - Start the Siddhi application by clicking on 'Run'.
-
If the Siddhi application starts successfully, the following messages are shown on the console:
- ReceiveKafkaInTextFormatWithCustomMapping.siddhi - Started Successfully! - Kafka version : 0.10.0.0 - Kafka commitId : b8642491e78c5a13 - Adding partition 0 for topic: kafka_sample_topic - Adding partitions [0] for topic: kafka_sample_topic - Subscribed for topics: [kafka_sample_topic] - Kafka Consumer thread starting to listen on topic/s: [kafka_sample_topic] with partition/s: [0] - Discovered coordinator 10.100.7.56:9092 (id: 2147483647 rack: null) for group group
Testing the sample¶
Navigate to <SI-Tooling-Home>/samples/sample-clients/kafka-producer and run ant command as follows:
ant -Dtype=text -DcustomMapping=true
Viewing the results¶
Messages similar to the following would be shown on the console.
- INFO {io.siddhi.core.stream.output.sink.LogSink} - ReceiveKafkaInTextFormatWithCustomMapping: LowProductionAlertStream : Event{timestamp=1513282182570, data=["Cupcake", 1665.0], isExpired=false}
Note¶
- Stop this Siddhi application, once you are done with the execution.
- Stop Kafka server and Zookeeper server individually by executing Ctrl+C.
@App:name("ReceiveKafkaInTextFormatWithCustomMapping")
@App:description('Receive events via Kafka transport in Text format with custom mapping and view the output on the console')
@source(type='kafka',
topic.list='kafka_sample_topic',
partition.no.list='0',
threading.option='single.thread',
group.id="group",
bootstrap.servers='localhost:9092',
@map(type='text',fail.on.missing.attribute='true', regex.A='(id):(.*)', regex.B='(amount):([-.0-9]+)',
@attributes(id = 'A[2]', amount = 'B[2]')))
define stream SweetProductionStream(id string, amount double);
@sink(type='log')
define stream LowProductionAlertStream(id string, amount double);
@info(name='query1')
from SweetProductionStream
select *
insert into LowProductionAlertStream;