Siddhi Application Overview¶
A Siddhi application (.siddhi) file is the deployment artifact containing the Stream Processing logic for WSO2 Integrator: SI.
The format of a Siddhi application is as follows:
@App:name("ReceiveAndCount")
@App:description('Receive events via HTTP transport and view the output on the console')
/*
Sample Siddhi App block comment
*/
-- Sample Siddhi App line comment
@Source(type = 'http',
receiver.url='http://localhost:8006/productionStream',
basic.auth.enabled='false',
@map(type='json'))
define stream SweetProductionStream (name string, amount double);
@sink(type='log')
define stream TotalCountStream (totalCount long);
-- Count the incoming events
@info(name='query1')
from SweetProductionStream
select count() as totalCount
insert into TotalCountStream;
Basic information about Siddhi applications¶
Following are some important things to note about Siddhi applications:
-
The file name of each Siddhi application must be equal to the name specified via the
@App:name()annotation.
e.g., In the sample Siddhi application given above, the application name isReceiveAndCount. Therefore, the Siddhi file name must beReceiveAndCount.Siddhi. -
It is optional to provide a description via the
@App:description()annotation. -
The definitions of the required streams, windows, tables, triggers and aggregations need to be included before the Siddhi queries.
e.g., In the above sample Siddhi file, the streams (lines 14 and 17) are defined before the queries (lines 21-23). -
Siddhi can infer the definition of the streams. It is not required to define all the streams. However, if annotations need to be added to a stream, that stream must be defined.
-
In the above sample, lines 4-6 nd 8 demonstrate how to include comments within Siddhi applications.
For more information about Siddhi applications, see Siddhi Application at Siddhi Streaming SQL Guide.
Common elements of a Siddhi application¶
This section explains the common types of definitions and queries that are included in Siddhi application:
Queries¶
Queries define the logical processing and selections that must be executed for streaming events. They consume from the pre-defined streams/ windows/ tables/ aggregations, process them in a streaming manner, and insert the output to another stream, window or table. For more information about Siddhi queries, see Queries at Siddhi Streaming SQL Guide.
Streams¶
Streams are one of the core elements of a stream processing application. A stream is a logical series of events ordered in time with a uniquely identifiable name and set of defined attributes with specific data types defining its schema. In Siddhi, streams are defined by giving it a name and the set of attributes it contains. Lines 14 and 17 of the above sample are examples of defined streams. For more information on Siddhi streams, see Streams at Siddhi Streaming SQL Guide.
Tables¶
A table is a collection of events that can be used to store streaming data. The capability to store events in a table allows you to query for stored events later or process them again with a different stream. Thegeneric table concept holds here as well, however, Siddhi tables also support numerous table specific data manipulations such as defining primary keys, indexing, etc. For more information on Siddhi tables, see Storage Integration and Tables at Siddhi Streaming SQL Guide.
Windows¶
Windows allow you to retain a collection of streaming events based on a time duration (time window), or a given number of events (length window). It allows you to process events that fall into the defined window or expire from it. For more information on Siddhi windows, see Windows at Siddhi Streaming SQL Guide.
Aggregations¶
Aggregation allows you to aggregate streaming events for different time granularities. The time granularities supported are seconds, minutes, hours, days, months and years. Aggregations such as sum, min, avg can be calculated for the desired duration(s) via Siddhi aggregation. For more information on Siddhi aggregations, see Aggregations at Siddhi Streaming SQL Guide.
Persisted Aggregations.¶
Note : "This capability is released as a product update on 11/06/2021. If you don't already have this update, you can get the latest updates now.
With Persisted aggregation, the aggregation for higher granularities will be executing on top of the database at the end of each time granularity(Day - at the end of the day, Month - at the end of the month, Year - at the end of the year). This is the recommended approach if the aggregation group by elements have lots of unique combinations.
-
Enabling Persisted Aggregation
The persisted aggregation can be enabled by adding the @persistedAggregation(enable="true") annotation on top of the aggregation definition. Furthermore, in order to execute the aggregation query, the cud function which is there in siddhi-store-rdbms is used. So in order to enable the "cud" operations, please add the following configuration on the deployment.yaml file.
siddhi: extensions: - extension: name: cud namespace: rdbms properties: perform.CUD.operations: trueIn order to use persisted aggregation, A datasource needs to configured through deployment.yaml file and it should be pointed out in @store annotation of the aggregation definition.
Furthermore when using persisted aggregation with MySQL, please provide the aggregation processing timezone in JDBC URL since by default MySQL database will use server timezone for some time-related conversions which are there in an aggregation query.
jdbc:mysql://localhost:3306/TEST_DB?useSSL=false&tz=useLegacyDatetimeCode=false&serverTimezone=UTCAlso when using persisted aggregation with Oracle, add below configuration in the datasource configuration,
For an example please refer to the following query which will be executed on the database to update the table for below sample Aggregation ,connectionInitSql: alter session set NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS' eg: - name: APIM_ANALYTICS_DB description: "The datasource used for APIM statistics aggregated data." jndiConfig: name: jdbc/APIM_ANALYTICS_DB definition: type: RDBMS configuration: jdbcUrl: 'jdbc:oracle:thin:@localhost:1521:XE' username: 'root' password: '123' driverClassName: oracle.jdbc.OracleDriver maxPoolSize: 50 idleTimeout: 60000 connectionTestQuery: SELECT 1 FROM DUAL connectionInitSql: alter session set NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS' validationTimeout: 30000 isAutoCommit: false@persistedAggregation(enable="true") define aggregation ResponseStreamAggregation from processedResponseStream select api, version, apiPublisher, applicationName, protocol, consumerKey, userId, context, resourcePath, responseCode, avg(serviceTime) as avg_service_time, avg(backendTime) as avg_backend_time, sum(responseTime) as total_response_count, time1, epoch, eventTime group by api, version, apiPublisher, applicationName, protocol, consumerKey, userId, context, resourcePath, responseCode, time1, epoch aggregate by eventTime every min;
The elements mentioned above work together in a Siddhi application to form an event flow. To understand how the elements os a Siddhi application are interconnected, you can view the design view of a Siddhi application. For more information, see Stream Processor Studio Overview.