Aggregating Data Incrementally¶ Purpose¶ This example demonstrates how to get running statistics using Siddhi. The sample Siddhi application aggregates the data relating to the raw material purchases of a sweet production factory. Before you begin: Install MySQL. Add the MySQL JDBC driver to your WSO2 Integrator: SI library as follows: Download the latest MySQL JDBC driver (Connector/J) from the MySQL site. Extract the MySQL JDBC Driver zip file you downloaded. Then use the jarbundle tool in the <SI_HOME>/bin directory to convert the jars in it into OSGi bundles. To do this, issue one of the following commands: For Windows: <SI_HOME>/bin/jartobundle.bat <PATH_OF_DOWNLOADED_JAR> <PATH_OF_CONVERTED_JAR> For Linux/macOS: <SI_HOME>/bin/jartobundle.sh <PATH_OF_DOWNLOADED_JAR> <PATH_OF_CONVERTED_JAR> Copy the converted bundles to the <SI_HOME>/lib directory. Create a data store named sweetFactoryDB in MySQL with relevant access privileges. Replace the values for the jdbc.url, username, and password parameters in the sample. e.g., jdbc.url - jdbc:mysql://localhost:3306/sweetFactoryDB username - root password - root In the WSO2 Integrator: SI, save the sample Siddhi application. Executing the Sample¶ To execute the sample Siddhi application, open it in WSO2 Integrator: SI and click the Start button (shown below) or click Run > Run. If the Siddhi application starts successfully, the following message appears in the console. AggregateDataIncrementally.siddhi - Started Successfully!. Testing the Sample¶ To test the sample Siddhi application, simulate single events for it via the WSO2 Integrator: SI as follows: To open the Event Simulator, click the Event Simulator icon. This opens the event simulation panel. To simulate events for the RawMaterialStream stream of the AggregateDataIncrementally Siddhi application, enter information in the Single Simulation tab of the event simulation panel as follows. Field Value Siddhi App Name AggregateDataIncrementally StreamName RawMaterialStream As a result, the attributes of the RawMaterialStream stream appear as marked in the image above. Send four events by entering values as shown below. Click Send after each event. Event 1 name: chocolate cake amount: 100 Event 2 name: chocolate cake amount: 200 Event 3 name: chocolate ice cream amount: `50 Event 4 name: chocolate ice cream amount: 150 In the StreamName field, select TriggerStream*. In the triggerId field, enter 1 as the trigger ID, and then click Send. Viewing the Results¶ The input and the corresponding output is displayed in the console as follows. Info The timestamp displayed is different because it is derived based on the time at which you send the events. INFO {io.siddhi.core.stream.output.sink.LogSink} - AggregateDataIncrementally : RawMaterialStatStream : [Event{timestamp=1513612116450, data=[1537862400000, chocolate ice cream, 100.0], isExpired=false}, Event{timestamp=1513612116450, data=[chocolate cake, 150.0], isExpired=false}] [INFO {io.siddhi.core.stream.output.sink.LogSink} - AggregateDataIncrementally : RawMaterialStatStream : [Event{timestamp=1513612116450, data=[1537862400000, chocolate ice cream, 100.0], isExpired=false}, Event{timestamp=1513612116450, data=[chocolate cake, 150.0], isExpired=false}] Click here to view the sample Siddhi application.@App:name("AggregateDataIncrementally") @App:description('Aggregates values every second until year and gets statistics') define stream RawMaterialStream (name string, amount double); @sink(type ='log') define stream RawMaterialStatStream (AGG_TIMESTAMP long, name string, avgAmount double); @store( type="rdbms", jdbc.url="jdbc:mysql://localhost:3306/sweetFactoryDB", username="root", password="root", jdbc.driver.name="com.mysql.cj.jdbc.Driver") define aggregation stockAggregation from RawMaterialStream select name, avg(amount) as avgAmount, sum(amount) as totalAmount group by name aggregate every sec...year; define stream TriggerStream (triggerId string); @info(name = 'query1') from TriggerStream as f join stockAggregation as s within "2016-06-06 12:00:00 +05:30", "2020-06-06 12:00:00 +05:30" per 'hours' select AGG_TIMESTAMP, s.name, avgAmount insert into RawMaterialStatStream;