Cleansing Data¶ When you receive input data via the WSO2 Integrator: SI, it may consist of data that is not required to generate the required output, null values for certain attributes, etc. Cleansing data refers to refining the input data received by assigning values where there are missing values (if there are applicable values), filtering out the data that is not required, etc. Filtering data based on conditions¶ You can filter data based on the exact match of attribute, based on a regex pattern or based on multiple criteria To understand this, consider a scenario where you receive the temperature of multiple rooms in a streaming manner. Filtering based on exact match of attribute: If you want the temperature readings only for a specific room, you can add a query with a filter as follows. from TempStream [roomNo==2233] select * insert into RoomAnalysisStream; With the roomNo==2233 filter, you are filtering the temperature readings for the room number 2233. These readings are then inserted into a separate stream named RoomAnalysisStream. Filtering based on regex pattern In the example of processing temperature readings, assume that you need to filter the temperature readings for a specific range of devices located in the Southern wing and used for purpose B. Also assume that this can be derived from the device ID because the first three characters of the device ID represent the wing, and the eighth character represents the purpose. e.g., in device ID SOU5438B765, the first three characters SOU represent the Southern wing, and the eighth character B represents purpose B. You can achieve this by adding a filter with a regex pattern as follows: @info(name = 'FilteredRoomRange') from TempStream select regex.find(SOU*B*) as deviceID, roomNo, temp insert into FilteredResultsStream; The regex.find(SOU*B*) function filters room IDs that start with the three characters SOU and has the character B with one or more characters before it as well as after it. Filtering based on multiple criteria In the example of processing temperature readings, assume that you need to filter the readings for a range of rooms (e.g., rooms 100-210) where the temperature is greater than 40. For this, you can add a filter as follows. from TempStream [(roomNo >= 100 and roomNo < 210) and temp > 40] select * insert into RoomAnalysisStream; Try it out¶ To try out the query used in the above example, let's include it in a Siddhi Application and run it. Open a new file. Then add and save the following Siddhi application. @App:name('TemperatureApp') define stream TempStream (deviceID string, roomNo int, temp double); @sink(type = 'log', prefix = "FilteredResult", @map(type = 'passThrough')) define stream RoomAnalysisStream (deviceID string, roomNo int, temp double); @info(name = 'Filtering2233') from TempStream[roomNo == 2233] select * insert into RoomAnalysisStream; 2. Open the event simulator and simulate three events for the TempStream input stream of the TemperatureApp Siddhi application with the values for the attributes as given below. This step requires the editor with WSO2 Integrator: SI installed. For instructions to simulate single events, see Testing Siddhi Applications - Simulating a single event. Event deviceID roomNo temp 1 SOU5438B765 2233 30 2 WES1827A876 1121 27 3 NOR1633B231 0899 28 Only the first event is logged in the terminal as follows. This is because only the first event has 2233 as the value for the roomNo attribute. INFO {io.siddhi.core.stream.output.sink.LogSink} - FilteredResult : Event{timestamp=1604494352744, data=[SOU5438B765, 2233, 30.0], isExpired=false} 3. Now remove the Filtering2233 query and replace it with the following query that filters based on multiple criteria. from TempStream [(roomNo >= 100 and roomNo < 210) and temp > 40] select * insert into RoomAnalysisStream; The complete Siddhi application is as follows: @App:name('TemperatureApp') define stream TempStream (deviceID string, roomNo int, temp double); @sink(type = 'log', prefix = "FilteredResult", @map(type = 'passThrough')) define stream RoomAnalysisStream (deviceID string, roomNo int, temp double); from TempStream [(roomNo >= 100 and roomNo < 210) and temp > 40] select * insert into RoomAnalysisStream; Open the event simulator and simulate three events for the TempStream input stream of the TemperatureApp Siddhi application with the values for the attributes as given below. This step requires the editor with WSO2 Integrator: SI installed. For instructions to simulate single events, see Testing Siddhi Applications - Simulating a single event. Event deviceID roomNo temp 1 SOU5438B765 183 30 2 WES1827A876 136 42 3 NOR1633B231 899 41 Only the second event is logged because only that event matched both the criteria. The event is logged as follows: INFO {io.siddhi.core.stream.output.sink.LogSink} - FilteredResult : Event{timestamp=1604557083556, data=[WES1827A876, 136, 42.0], isExpired=false} Modifying, removing, and replacing attributes¶ The input data may include attributes that are not required in order to generate the required output, attributes with values that need to be updated or replaced before further processing. Assume that in the previous example, you do not need the device ID for further processing, and you need to present the room numbers as string values instead of integer values. To do this, follow the procedure below: @info(name = 'CleaningData') from FilteredResultsStream select cast(roomNo, 'string') as roomNo, temp insert into CleansedDataStream; Here, the cast() function presents the value for the roomNo attribute as a string value although it is received as an integer value. The select clause excludes the deviceID attribute. Try it out¶ To try out the above example, follow the steps below: Open a new file. Then add and save the following Siddhi application. @App:name('TemperatureApp') define stream TempStream (deviceID string, roomNo int, temp double); @sink(type = 'log', prefix = "CleanedData", @map(type = 'passThrough')) define stream CleansedDataStream (roomNo string, temp double); @info(name = 'CleaningData') from TempStream select cast(roomNo, 'string') as roomNo, temp insert into CleansedDataStream; In this Siddhi application, the TempStream has an attribute named deviceID, but it is not selected to be included in the output events. The roomNo attribute is cast as a string value via cast(roomNo, "string"). This means although the value for this attribute is received as an integer, it is presented as a string value in the output. Open the event simulator and simulate an event for the TempStream input stream of the TemperatureApp Siddhi application with the values for the attributes as given below. This step requires the editor with WSO2 Integrator: SI installed. For instructions to simulate single events, see Testing Siddhi Applications - Simulating a single event. deviceID roomNo temp SOU5438B765 183 30 The output is logged as follows: INFO {io.siddhi.core.stream.output.sink.LogSink} - CleanedData : Event{timestamp=1604578130314, data=[183, 30.0], isExpired=false} Handling attributes with null values¶ In the example of processing temperature readings, assume that some events arrive with null values for the deviceID attribute, and you want to assign the value unknown in such scenarios. This can be achieved by writing a query as follows: @info(name = 'AddingMissingValues') from FilteredResultsStream select ifThenElse(deviceID is null, "UNKNOWN", deviceID) as deviceID, roomNo, temp insert into CleansedDataStream; Try it out¶ To try out the above example, follow the steps below: Install and start WSO2 Integrator: SI. Open a new file. Then add and save the following Siddhi application. @App:name('TemperatureApp') @App:description('Description of the plan') define stream TempStream (deviceID string, roomNo string, temp double); @sink(type = 'log', prefix = "Cleansed Data", @map(type = 'passThrough')) define stream CleansedDataStream (deviceID string, roomNo string, temp double); @info(name = 'AddingMissingValues') from TempStream select ifThenElse(deviceID is null, "UNKNOWN", deviceID) as deviceID, roomNo, temp insert into CleansedDataStream; In this Siddhi application, the TempStream stream has a deviceID attribute that may arrive with a null value. The ifThenElse(deviceID is null, "UNKNOWN", deviceID) expression substitutes "UNKNOWN" whenever deviceID is null and passes the original value through otherwise. The result is inserted into CleansedDataStream so downstream queries see a non-null deviceID. Open the event simulator and simulate an event for the TempStream input stream of the TemperatureApp Siddhi application with the values for the attributes as given below. This step requires the editor with WSO2 Integrator: SI installed. For instructions to simulate single events, see Testing Siddhi Applications - Simulating a single event. deviceID roomNo temp Select the Is Null check box for this attribute 183 30 The output is logged as follows: INFO {io.siddhi.core.stream.output.sink.LogSink} - Cleansed Data : Event{timestamp=1604581209943, data=[UNKNOWN, 183, 30.0], isExpired=false}