Receiving Events via File
Purpose¶
This application demonstrates how to use siddhi-io-file
for receiving.
Prerequisites¶
- Edit this sample file by replacing
<SI-Tooling-Home>
with the absolute path of your WSO2SI Tooling home directory. - Save this sample.
Executing the sample¶
- Start the Siddhi application by clicking on 'Run'.
-
If the Siddhi application starts successfully, the following messages would be shown on the console.
ReceiveEventsFromFile.siddhi - Started Successfully!
-
Check the directories
<SI-Tooling-Home>/samples/artifacts/ReceiveEventsFromFile/files/consumed
andnew
. - All the files which were in the directory
new
should have been moved toconsumed
directory.
Note¶
If the sample is not running and producing output, do the following first.
- Move all the files in
<SI-Tooling-Home>/samples/artifacts/ReceiveEventsFromFile/files/consumed
directory to<SI-Tooling-Home>/samples/artifacts/ReceiveEventsFromFile/files/new
. - Delete all the files in consumed and sink directories.
Viewing the results¶
Processed output events will be logged in the console as follows:
INFO {io.siddhi.core.query.processor.stream.LogStreamProcessor} - ReceiveEventsFromFile: event, StreamEvent{ timestamp=1513847875990, beforeWindowData=null, onAfterWindowData=null, outputData=[apache, 80.0, 2.0], type=CURRENT, next=null}
INFO {io.siddhi.core.query.processor.stream.LogStreamProcessor} - ReceiveEventsFromFile: event, StreamEvent{ timestamp=1513847876004, beforeWindowData=null, onAfterWindowData=null, outputData=[cloudbees, 134.4, 2.0], type=CURRENT, next=null}
@App:name('ReceiveEventsFromFile')
@source(type='file', mode='text.full',
dir.uri='file:/<SI-Tooling-Home>/samples/artifacts/ReceiveEventsFromFile/files/new',
action.after.process='move',
tailing='false',
move.after.process='file:/<SI-Tooling-Home>/samples/artifacts/ReceiveEventsFromFile/files/consumed',
@map(type='json'))
define stream SweetProductionStream (name string, amount double);
from SweetProductionStream#window.time(1 min)
select name, sum(amount) as hourlyTotal, convert(time:extract('HOUR', time:currentTimestamp(), 'yyyy-MM-dd hh:mm:ss'), 'double') as currentHour
insert into LowProductionAlertStream;
from LowProductionAlertStream#log('event')
insert into LogStream;