Capturing MySQL Inserts via CDC¶ Purpose¶ This sample demonstrates how to capture change data from MySQL using Siddhi. The change events that can be captured include INSERT, UPDATE, and DELETE. Before you begin: Ensure that MySQL is installed on your computer. Add the MySQL JDBC driver to the <SI_HOME>/lib directory as follows: Download the latest MySQL JDBC driver (Connector/J) from the MySQL website. Unzip the archive. Copy the Connector/J JAR and place it in the <SI_HOME>/lib directory. Configure MySQL to enable binary logging. If you are using MySQL 8.0, use the following query to check the binlog status: SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM performance_schema.global_variables WHERE variable_name='log_bin'; Enable state persistence in siddhi applications. To do this, open the <SI_HOME>/conf/server/deployment.yaml file and set the state.persistence enabled=true property. Create a database named production by issuing the following command. CREATE DATABASE production; Create a user named wso2sp with wso2 as the password, and with SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT privileges. To do this, issue the following command. CREATE USER 'wso2sp'@'localhost' IDENTIFIED WITH mysql_native_password BY 'wso2'; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'wso2sp'@'localhost'; Change the database by issuing the following command. use production; Create a table named SweetProductionTable. CREATE TABLE SweetProductionTable (name VARCHAR(20),amount double(10,2)); Save the sample Siddhi application in WSO2 Integrator: SI. Executing the sample¶ To execute the sample open the saved Siddhi application in WSO2 Integrator: SI, and start it by clicking the Start button or by clicking Run > Run. If the Siddhi application starts successfully, the following message appears in the console. CDCWithListeningMode.siddhi - Started Successfully! Note If you want to edit the Siddhi application after you have started it, stop it first, make your edits, save it and then start it again. Testing the sample¶ To test the sample Siddhi application, insert a record to the SweetProductionTable table you created by issuing the following command: INSERT into SweetProductionTable values('chocolate',100.0); Viewing the results¶ This insert is logged in the WSO2 Integrator: SI console as follows. Info Optionally, you can use this sample to also capture update and delete operations. delete operation events include before_name and before amount keys. update operation events include the before_name, name, before_amount, amount keys. @App:name('CDCWithListeningMode') @App:description('Capture MySQL Inserts using cdc source listening mode.') @source(type = 'cdc', url = 'jdbc:mysql://localhost:3306/production', username = 'wso2sp', password = 'wso2', table.name = 'SweetProductionTable', operation = 'insert', @map(type = 'keyvalue')) define stream insertSweetProductionStream (name string, amount double); @sink(type = 'log') define stream logStream (name string, amount double); @info(name = 'query') from insertSweetProductionStream select name, amount insert into logStream;