Publishing XML Events via MQTT
Purpose¶
This application demonstrates how to configure WSO2 Streaming Integrator Tooling to send sweet production events via MQTT transport in XML format and view the output on the mqtt-consumer.
Prerequisites¶
-
Save this sample. The following message appears on the console.
Siddhi App PublishMqttInXmlFormat successfully deployed.
-
Setting up MQTT Mosquitto broker in Ubuntu Linux.
-
Add the
mosquitto
repository using the following commands.sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa sudo apt-get update
-
Execute the following command to install the
Mosquitto
broker package.sudo apt-get install mosquitto
-
Install
Mosquitto
developer libraries to develop MQTT clients.sudo apt-get install libmosquitto-dev
-
Execute the following command to install
Mosquitto
client packages.sudo apt-get install mosquitto-clients
-
Ensure that the
Mosquitto
broker is running.sudo service mosquitto status
-
Executing the Sample¶
- Open a terminal, navigate to the
{SI-Tooling-Home}/samples/sample-clients/mqtt-consumer
directory, and run theant
command.- If you use the default topic
mqtt_topic
and URLtcp://localhost:1883
, in your program use theant
command without any arguments. - However, if you use a different topic, run the
ant
command with appropriate argument. e.g.ant -Dtopic=mqttTest
- If you use the default topic
- Start the Siddhi application by clicking 'Run'.
-
If the Siddhi application starts successfully, the following messages appear on the console
PublishMqttInXmlFormat.siddhi - Started Successfully!
Testing the Sample¶
- Open the event simulator by clicking on the second icon or press Ctrl+Shift+I.
- In the Single Simulation tab of the panel, select values as follows:
Siddhi App Name
:PublishMqttInXmlFormat
Stream Name
:SweetProductionStream
- In the
name
field andamount
fields, enter 'toffee' and '45.24' respectively. Then click Send to send the event. - Send some more events.
Viewing the Results¶
See the output in the terminal of {SI-Tooling-Home}/samples/sample-clients/mqtt-consumer
. You will get the output as follows (example for 3 events):
[java] [org.apache.axiom.om.util.StAXUtils] : XMLStreamReader is org.apache.axiom.util.stax.dialect.SJSXPStreamReaderWrapper
[java] [io.siddhi.core.stream.output.sink.LogSink] : PublishMqttInXmlFormatTest : logStream : Event{timestamp=1512462478777, data=[chocolate, 78.34], isExpired=false}
[java] [org.apache.axiom.om.util.StAXUtils] : XMLStreamReader is org.apache.axiom.util.stax.dialect.SJSXPStreamReaderWrapper
[java] [io.siddhi.core.stream.output.sink.LogSink] : PublishMqttInXmlFormatTest : logStream : Event{timestamp=1512462520264, data=[sweets, 34.57], isExpired=false}
[java] [org.apache.axiom.om.util.StAXUtils] : XMLStreamReader is org.apache.axiom.util.stax.dialect.SJSXPStreamReaderWrapper
[java] [io.siddhi.core.stream.output.sink.LogSink] : PublishMqttInXmlFormatTest : logStream : Event{timestamp=1512462534053, data=[coffee, 12.7], isExpired=false}
Notes¶
If you need to edit this application while it is running, stop the application, Save
-> Start
.
If the message "LowProductionAlertStream' stream could not connect to 'localhost:1883"
is logged, it could be due to port 1883, which is defined in the Siddhi application. This port is already being used by a different program. To resolve this issue, please do the following:
- Stop this Siddhi application (click 'Run' on the menu bar -> 'Stop').
- Change the port 1883 to an unused port. To do this you need to add a new listener in mosquitto.conf (e.g., listener port 1884,listener port 1885).
- Start the application and check whether the expected output appears on the console.
@App:name("PublishMqttInXmlFormat")
define stream SweetProductionStream (name string, amount double);
@sink(type='mqtt', url= 'tcp://localhost:1883',topic='mqtt_topic',
@map(type='xml'))
define stream LowProductionAlertStream (name string, amount double);
from SweetProductionStream
select *
insert into LowProductionAlertStream;