Receiving Events via File¶ Purpose¶ This application demonstrates how to use siddhi-io-file for receiving file-based events. Prerequisites¶ Install the file Siddhi extension. For instructions, see Downloading and Installing Siddhi Extensions. Two directories that the Siddhi application can read from and write to. This example uses <YOUR_HOME>/file-samples/new (source) and <YOUR_HOME>/file-samples/consumed (destination). Try it out¶ Create the source and destination directories: mkdir -p <YOUR_HOME>/file-samples/new mkdir -p <YOUR_HOME>/file-samples/consumed Edit the Siddhi application below by replacing <YOUR_HOME> with the absolute path of the parent directory you chose, and save it. Start the Siddhi application by clicking Run. The following message appears in the console: ReceiveEventsFromFile.siddhi - Started Successfully! Drop one or more JSON event files into <YOUR_HOME>/file-samples/new. Each file should contain a JSON event matching the SweetProductionStream schema, for example: {"event":{"name":"apache","amount":80.0}} The application reads each file, processes it, and moves it to <YOUR_HOME>/file-samples/consumed. Processed events are logged in the console: INFO {io.siddhi.core.query.processor.stream.LogStreamProcessor} - ReceiveEventsFromFile: event, StreamEvent{ timestamp=1513847875990, beforeWindowData=null, onAfterWindowData=null, outputData=[apache, 80.0, 2.0], type=CURRENT, next=null} INFO {io.siddhi.core.query.processor.stream.LogStreamProcessor} - ReceiveEventsFromFile: event, StreamEvent{ timestamp=1513847876004, beforeWindowData=null, onAfterWindowData=null, outputData=[cloudbees, 134.4, 2.0], type=CURRENT, next=null} Re-running the example If you want to re-process the same files, move them back from the consumed directory to the new directory before re-running. Siddhi application¶ @App:name('ReceiveEventsFromFile') @source(type='file', mode='text.full', dir.uri='file:<YOUR_HOME>/file-samples/new', action.after.process='move', tailing='false', move.after.process='file:<YOUR_HOME>/file-samples/consumed', @map(type='json')) define stream SweetProductionStream (name string, amount double); from SweetProductionStream#window.time(1 min) select name, sum(amount) as hourlyTotal, convert(time:extract('HOUR', time:currentTimestamp(), 'yyyy-MM-dd hh:mm:ss'), 'double') as currentHour insert into LowProductionAlertStream; from LowProductionAlertStream#log('event') insert into LogStream;