Skip to content

Triggering Integration Flows via the WSO2 Integrator: MI

Introduction

In this tutorial, let's look at how the WSO2 Integrator: SI generates an alert based on the events received, and how that particular alert can trigger an integration flow in the WSO2 Integrator: MI, and get a response back to the WSO2 Integrator: SI for further processing.

To understand this, consider a scenario where the WSO2 Integrator: SI receives production data from a factory, and triggers an integration flow if it detects a per minute production average that exceeds 100.

Before you begin:

  • Start WSO2 Integrator: SI server.

  • The grpc Siddhi extension ships pre-installed with the WSO2 Integrator: SI distribution under <SI_HOME>/lib/. No separate installation is required.

Configuring the WSO2 Integrator: SI

Let's design a Siddhi application that triggers an integration flow and deploy it by following the procedure below:

  1. In WSO2 Integrator: SI welcome page, click Create New Siddhi Application to open a new application.

  2. Add a name and a description for your new Siddhi application as follows:

    @App:name("grpc-call-response")
    @App:description("This application triggers integration process in the WSO2 Integrator: MI using gRPC calls")
    
  3. Let's add an input stream to define the schema of input production events, and connect a source of the http type to receive those events.

    @source(type = 'http',
            receiver.url='http://localhost:8006/InputStream',
            basic.auth.enabled='false',
            @map(type='json'))
    define stream InputStream(symbol string, amount double);
    

    Here, the WSO2 Integrator: SI receives events to the http://localhost:8006/InputStream in the JSON format. Each event reports the product name (via the symbol attribute) and the amount produced.

  4. Now, let's add the configurations to publish an alert in the WSO2 Integrator: MI to trigger an integration flow, and then receive a response back into the WSO2 Integrator: SI.

    @sink(
            type='grpc-call',
            publisher.url = 'grpc://localhost:8888/org.wso2.grpc.EventService/process/inSeq',
            sink.id= '1', headers='Content-Type:json',
            @map(type='json', @payload("""{"symbol":"{{ symbol }}","avgAmount":{{ avgAmount }}}"""))
        )
    define stream FooStream (symbol string, avgAmount double);
    
    @source(type='grpc-call-response', sink.id= '1', @map(type='json'))
    define stream BarStream (symbol string, avgAmount double);
    

    Note the following in the above configuration:

    • Each output event that represents an alert that is published to the WSO2 Integrator: MI reports the product name and the average production (as per the schema of the FooStream stream.

    • The grpc-call sink connected to the FooStream stream gets the two attributes from the stream and generates the output events as JSON messages before they are published to the WSO2 Integrator: MI. The value for the publisher.url parameter in the sink configuration contains process and inSeq which means that the WSO2 Integrator: SI calls the process method of the gRPC Listener server in the WSO2 Integrator: MI, and injects the message to the inSeq which then sends a response back to the client.

    • The grpc-call-response source connected to the BarStream input stream retrieves a response from the WSO2 Integrator: MI and publishes it as a JSON message in the WSO2 Integrator: SI. As specified via the schema of the BarStream input stream, this response comprises of a single JSON message.

  5. To publish the messages received from the WSO2 Integrator: MI as logs in the terminal, let's define an output stream named LogStream, and connect a sink of the log type to it as shown below.

    @sink(type='log', prefix='response_from_mi: ')
    define stream LogStream (symbol string, avgAmount double);
    
  6. Define a Siddhi query named CalculateAverageProductionPerMinute to calculate the average production per minute:

    @info(name = 'CalculateAverageProductionPerMinute')
    from InputStream#window.timeBatch(1 min)
    select avg(amount) as avgAmount, symbol
    group by symbol
    insert into AVGStream;
    

    This query applies a time batch window to the InputStream stream so that events within each minute are considered as a separate subset for the calculations in the query. The minutes are considered in a tumbling manner because it is a batch window. Then the avg() function is applied to the amount attribute of the input stream to derive the average production amount. The results are inserted into an inferred stream named AVGStream.

  7. Define a Siddhi query named FilterExcessProduction to filter events from the AVGStream stream where the average production is greater than 100:

    @info(name = 'FilterExcessProduction')
    from AVGStream[avgAmount > 100]
    select symbol, avgAmount
    insert into FooStream;
    

    Here, the avgAmount > 100 filter is applied to filter only events that report an average production amount greater than 100. The filtered events are inserted into the FooStream stream.

  8. Define a Siddhi query named LogResponseEvents to log all the responses received from the WSO2 Integrator: MI:

    @info(name = 'LogResponseEvents')
    from BarStream
    select *
    insert into LogStream;
    

    The responses received from the WSO2 Integrator: MI are directed to the BarStream input stream. This query gets all of these events from the BarStream stream and inserts them into the LogStream stream, which is connected to a log sink so that they can be published as logs in the terminal.

    The Siddhi application is now complete.

    Click here to view the complete Siddhi application.
    @App:name("grpc-call-response")
    @App:description("This application triggers integration process in the WSO2 Integrator: MI using gRPC calls")
    
    @source(type = 'http',
                receiver.url='http://localhost:8006/InputStream',
                basic.auth.enabled='false',
                @map(type='json'))
    define stream InputStream(symbol string, amount double);
    
    @sink(
        type='grpc-call',
        publisher.url = 'grpc://localhost:8888/org.wso2.grpc.EventService/process/inSeq',
        sink.id= '1', headers='Content-Type:json',
        @map(type='json', @payload("""{"symbol":"{{ symbol }}","avgAmount":{{ avgAmount }}}"""))
    )
    define stream FooStream (symbol string, avgAmount double);
    
    @source(type='grpc-call-response', sink.id= '1', @map(type='json'))
    define stream BarStream (symbol string, avgAmount double);
    
    @sink(type='log', prefix='response_from_mi: ')
    define stream LogStream (symbol string, avgAmount double);
    
    @info(name = 'CalculateAverageProductionPerMinute')
    from InputStream#window.timeBatch(1 min)
    select avg(amount) as avgAmount, symbol
    group by symbol
    insert into AVGStream;
    
    @info(name = 'FilterExcessProduction')
    from AVGStream[avgAmount > 100]
    select symbol, avgAmount
    insert into FooStream;
    
    @info(name = 'LogResponseEvents')
    from BarStream
    select *
    insert into LogStream;
    
  9. Save the Siddhi application.

  10. Click the Run button in WSO2 Integrator: SI to run the Siddhi application.

Configuring WSO2 Integrator: MI

After doing the required configurations in the WSO2 Integrator: SI, let's configure the WSO2 Integrator: MI to receive the excess production alert from the WSO2 Integrator: SI as a gRPC event and send back a response.

  1. Start the gRPC server in the WSO2 Integrator: MI server to receive the WSO2 Integrator: SI event. To do this, save the following inbound endpoint configuration as GrpcInboundEndpoint.xml in the <MI_Home>/repository/deployment/server/synapse-configs/default/inbound-endpoints directory.

    <?xml version="1.0" encoding="UTF-8"?>
    <inboundEndpoint xmlns="http://ws.apache.org/ns/synapse" name="GrpcInboundEndpoint" sequence="inSeq" onError="fault" protocol="grpc" suspend="false">
        <parameters>
            <parameter name="inbound.grpc.port">8888</parameter>
        </parameters>
    </inboundEndpoint>
    

    This configuration has a configuration parameter to start the gRPC server, and specifies the default sequence to inject messages accordingly.

  2. Deploy the following sequence by saving it as inSeq.xml file in the <MI_Home>/repository/deployment/server/synapse-configs/default/sequences directory.

    Info

    Note that the name of the sequence is inSeq. This is referred to in the gRPC sink configuration in the grpc-call-response Siddhi application you previously created in the WSO2 Integrator: SI.

    <?xml version="1.0" encoding="UTF-8"?>
    <sequence xmlns="http://ws.apache.org/ns/synapse" name="inSeq">
       <log level="full"/>
       <respond/>
    </sequence>
    

This sequence does the following:

  • Logs the response.

  • Sends the response back to the gRPC client.

  • Start the WSO2 Integrator: MI by issuing the appropriate command based on your operating system:

    ./micro-integrator.sh
    
    micro-integrator.bat
    

Executing and getting results

To send an event to the defined http source hosted in http://localhost:8006/InputStream, issue the following sample CURL command.

curl -X POST -d "{\"event\":{\"symbol\":\"soap\",\"amount\":110.23}}" http://localhost:8006/InputStream --header "Content-Type:application/json"

In the SI console an output similar to following will be printed after 1 minute (if the average of the amount is larger than 100)

INFO {io.siddhi.core.stream.output.sink.LogSink} - response_from_mi: : Event{timestamp=1573711436547, data=[soap, 110.23], isExpired=false}

Alternative: fire-and-forget via the grpc sink

As an alternative to the request/response pattern above, you can trigger a sequence in the WSO2 Integrator: MI without waiting for a response — a fire-and-forget flow. This is useful when the downstream integration does not need to send anything back, or when the WSO2 Integrator: SI does not need to react to the response.

The key difference is the sink type and the method path in the publisher URL:

  • Use @sink(type='grpc', ...) instead of grpc-call. The grpc sink does not expect a response and does not need a matching grpc-call-response source.

  • The publisher URL ends in /consume/inSeq instead of /process/inSeq. The consume method on the WSO2 Integrator: MI's gRPC inbound endpoint accepts the message and routes it to the named sequence without returning a response to the client. The process method, by contrast, routes to the sequence and then sends the sequence's <respond/> output back to the caller.

The following Siddhi application demonstrates the fire-and-forget variant:

@App:name("grpc-fire-and-forget")
@App:description("Triggers inSeq in the WSO2 Integrator: MI without waiting for a response")

@source(type='http',
        receiver.url='http://localhost:8006/InputStream',
        basic.auth.enabled='false',
        @map(type='json'))
define stream InputStream(symbol string, amount double);

@sink(
    type='grpc',
    publisher.url='grpc://localhost:8888/org.wso2.grpc.EventService/consume/inSeq',
    headers='Content-Type:json',
    @map(type='json'))
define stream FooStream(symbol string, amount double);

from InputStream
select *
insert into FooStream;

The WSO2 Integrator: MI-side artifacts (gRPC inbound endpoint and inSeq sequence) are the same as in the previous sections. The sequence's <respond/> mediator still fires, but the WSO2 Integrator: SI does not listen for it — the response is discarded.