Iterate/Aggregate Fault Handling in WSO2 EI

In WSO2 we can implement the Splitter and Aggregator EIP using the Iterate and Aggregate mediators. With the Splitter pattern we can split a message composed by different elements that needs to be processed individually, and then we use the Aggregator pattern to aggregate the results of each individual call and then perform some processing over the aggregated results.

Happy Path Example

In a happy path example all the requests and processing done inside the iterate mediator will occur with no failures and the aggregate mediator will handle the results of all the requests made. We can see that in the proxy below:

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="ProxyIterateAggregateFaultNotWorking" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <!-- This is to force going to fault sequence in case of SOAP Fault -->
            <!-- Set this to force a fault in case of SOAPFault returned by the backend service -->
            <payloadFactory media-type="xml">
                <format>
                    <payload xmlns="">
                        <echo:echoInt xmlns:echo="http://echo.services.core.carbon.wso2.org">
                            <in>1</in>
                        </echo:echoInt>
                        <echo:echoInt xmlns:echo="http://echo.services.core.carbon.wso2.org">
                            <in>1</in>
                        </echo:echoInt>
                    </payload>
                </format>
                <args/>
            </payloadFactory>
            <!-- 1. Iterate over the echoInt elements -->
            <iterate expression="//echo:echoInt" xmlns:echo="http://echo.services.core.carbon.wso2.org">
                <target>
                    <sequence>
                        <property name="FORCE_ERROR_ON_SOAP_FAULT" scope="default" type="STRING" value="true"/>
                        <header name="Action" scope="default" value="urn:echoInt"/>
                        <call>
                            <endpoint>
                                <address format="soap11" uri="http://localhost:8280/services/echo">
                                    <suspendOnFailure>
                                        <initialDuration>-1</initialDuration>
                                        <progressionFactor>-1</progressionFactor>
                                        <maximumDuration>0</maximumDuration>
                                    </suspendOnFailure>
                                    <markForSuspension>
                                        <retriesBeforeSuspension>0</retriesBeforeSuspension>
                                    </markForSuspension>
                                </address>
                            </endpoint>
                        </call>
                    </sequence>
                </target>
            </iterate>
            <property name="result" scope="default">
                <result xmlns=""/>
            </property>
            <aggregate>
                <completeCondition>
                    <messageCount max="-1" min="-1"/>
                </completeCondition>
                <onComplete enclosingElementProperty="result" expression="$body/*[1]">
                    <log level="custom">
                        <property name="ON Aggregate SEQ" value="faultSequence default"/>
                    </log>
                    <respond/>
                </onComplete>
            </aggregate>
        </inSequence>
        <outSequence/>
        <faultSequence>
            <log level="custom">
                <property name="ON FAULT SEQ" value="faultSequence default"/>
            </log>
        </faultSequence>
    </target>
</proxy>

This proxy basically creates a payload contaning multiple echoInt requests, and then we iterate over each echoInt to make a request to the echo service. Inside the iterate we set the Action header and make a request to the endpoint using the call mediator. We added the property FORCE_ERROR_ON_SOAP_FAULT so in case of a SOAPFault returned by the backend service the flow will be redirected to the fault sequence. In this example, there is no error, so, the aggregate mediator after the iterate will gather all the responses and aggregate them in a single message and respond back. We can see the response of this Proxy service in the xml below:

<result>
    <ns:echoIntResponse xmlns:ns="http://echo.services.core.carbon.wso2.org">
        <return>1</return>
    </ns:echoIntResponse>
    <ns:echoIntResponse xmlns:ns="http://echo.services.core.carbon.wso2.org">
        <return>1</return>
    </ns:echoIntResponse>
</result>

Fault Example

In the proxy below, we made a small change in the payloadFactory in order to force a soap fault from the backend:

<payloadFactory media-type="xml">
    <format>
        <payload xmlns="">
            <echo:echoInt xmlns:echo="http://echo.services.core.carbon.wso2.org">
                <in>1</in>
            </echo:echoInt>
            <echo:echoInt xmlns:echo="http://echo.services.core.carbon.wso2.org">
                <in>abc</in>
            </echo:echoInt>
        </payload>
    </format>
    <args/>
</payloadFactory>

We are passing the value ‘abc’ in the second echoInt element. With this example, when we try the proxy we are not going to receive any response as the flow is redirected to the faultSequence and we can see the log entry:

[2020-02-16 21:32:41,526] [EI-Core]  INFO - LogMediator ON FAULT SEQ = faultSequence default

The full proxy service code can be seen below:

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="ProxyIterateAggregateFaultNotWorking" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <!-- This is to force going to fault sequence in case of SOAP Fault -->
            <!-- Set this to force a fault in case of SOAPFault returned by the backend service -->
            <payloadFactory media-type="xml">
                <format>
                    <payload xmlns="">
                        <echo:echoInt xmlns:echo="http://echo.services.core.carbon.wso2.org">
                            <in>1</in>
                        </echo:echoInt>
                        <echo:echoInt xmlns:echo="http://echo.services.core.carbon.wso2.org">
                            <in>abc</in>
                        </echo:echoInt>
                    </payload>
                </format>
                <args/>
            </payloadFactory>
            <!-- 1. Iterate over the echoInt elements -->
            <iterate expression="//echo:echoInt" xmlns:echo="http://echo.services.core.carbon.wso2.org">
                <target>
                    <sequence>
                        <property name="FORCE_ERROR_ON_SOAP_FAULT" scope="default" type="STRING" value="true"/>
                        <header name="Action" scope="default" value="urn:echoInt"/>
                        <call>
                            <endpoint>
                                <address format="soap11" uri="http://localhost:8280/services/echo">
                                    <suspendOnFailure>
                                        <initialDuration>-1</initialDuration>
                                        <progressionFactor>-1</progressionFactor>
                                        <maximumDuration>0</maximumDuration>
                                    </suspendOnFailure>
                                    <markForSuspension>
                                        <retriesBeforeSuspension>0</retriesBeforeSuspension>
                                    </markForSuspension>
                                </address>
                            </endpoint>
                        </call>
                    </sequence>
                </target>
            </iterate>
            <property name="result" scope="default">
                <result xmlns=""/>
            </property>
            <aggregate>
                <completeCondition>
                    <messageCount max="-1" min="-1"/>
                </completeCondition>
                <onComplete enclosingElementProperty="result" expression="$body/*[1]">
                    <log level="custom">
                        <property name="ON Aggregate SEQ" value="faultSequence default"/>
                    </log>
                    <respond/>
                </onComplete>
            </aggregate>
        </inSequence>
        <outSequence/>
        <faultSequence>
            <log level="custom">
                <property name="ON FAULT SEQ" value="faultSequence default"/>
            </log>
        </faultSequence>
    </target>
</proxy>

Fault Handling with Iterate/Aggregate

In order to still have the aggregate to work when having a fault, we need to make some changes to proxy and make use of sequences. After the changes we are going to have the following artifacts:

The proxy service code can be seen below:

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="ProxyIterateAggregateWorking" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <!-- This is to force going to fault sequence in case of SOAP Fault -->
            <!-- Set this to force a fault in case of SOAPFault returned by the backend service -->
            <payloadFactory media-type="xml">
                <format>
                    <payload xmlns="">
                        <echo:echoInt xmlns:echo="http://echo.services.core.carbon.wso2.org">
                            <in>1</in>
                        </echo:echoInt>
                        <echo:echoInt xmlns:echo="http://echo.services.core.carbon.wso2.org">
                            <in>abc</in>
                        </echo:echoInt>
                    </payload>
                </format>
                <args/>
            </payloadFactory>
            <!-- 1. Iterate over the echoInt elements -->
            <iterate expression="//echo:echoInt" xmlns:echo="http://echo.services.core.carbon.wso2.org">
                <target>
                    <sequence>
                        <sequence key="IterateSequence"/>
                    </sequence>
                </target>
            </iterate>
            <sequence key="AggregateSequence"/>
        </inSequence>
        <outSequence/>
        <faultSequence>
            <log level="custom">
                <property name="ON FAULT SEQ" value="faultSequence default"/>
            </log>
        </faultSequence>
    </target>
</proxy>

The main difference from the previous proxy is that inside the iterate we are using a predefined sequence instead of an anonymous sequence. And we have the AggregateSequence just after the iterate mediator.

The AggregateSequence just contains the code that we had previously inside the proxy:

<?xml version="1.0" encoding="UTF-8"?>
<sequence name="AggregateSequence" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
    <property name="result" scope="default">
        <result xmlns=""/>
    </property>
    <aggregate>
        <completeCondition>
            <messageCount max="-1" min="-1"/>
        </completeCondition>
        <onComplete enclosingElementProperty="result" expression="$body/*[1]">
            <log level="custom">
                <property name="ON Aggregate SEQ" value="faultSequence default"/>
            </log>
            <respond/>
        </onComplete>
    </aggregate>
</sequence>

There is nothing special in that sequence.

Now let us see the IterateSequence:

<?xml version="1.0" encoding="UTF-8"?>
<sequence name="IterateSequence" onError="IterateFaultHandler" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
    <!-- Set this to force a fault in case of SOAPFault returned by the backend service -->
    <property name="FORCE_ERROR_ON_SOAP_FAULT" scope="default" type="STRING" value="true"/>
    <header name="Action" scope="default" value="urn:echoInt"/>
    <call>
        <endpoint>
            <address format="soap11" uri="http://localhost:8280/services/echo">
                <suspendOnFailure>
                    <initialDuration>-1</initialDuration>
                    <progressionFactor>-1</progressionFactor>
                    <maximumDuration>0</maximumDuration>
                </suspendOnFailure>
                <markForSuspension>
                    <retriesBeforeSuspension>0</retriesBeforeSuspension>
                </markForSuspension>
            </address>
        </endpoint>
    </call>
</sequence>

It contains the same code that we had in the proxy but the tricky is in the onError property of the sequence where we specify which sequence should be executed in case of errors in the sequence, that in this case is the IterateFaultHandler. So, in this case, when it tries to execute the echoInt with the abc value it will receive the SOAPFault and will redirect the flow to the IterateFaultHandler.

So, now let us see the IterateFaultHandler:

<?xml version="1.0" encoding="UTF-8"?>
<sequence name="IterateFaultHandler" onError="IterateFaultHandler" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
    <log level="custom">
        <property name="ON ITERATE FAULT SEQ" value="IterateFaultHandler"/>
    </log>
    <payloadFactory media-type="xml">
        <format>
            <errorProcessing xmlns="">
                <error>$1</error>
            </errorProcessing>
        </format>
        <args>
            <arg evaluator="xml" expression="//faultstring"/>
        </args>
    </payloadFactory>
    <property name="RESPONSE" scope="default" type="STRING" value="true"/>
    <sequence key="AggregateSequence"/>
</sequence>

Inside this fault sequence we create an error payload with the faultstring. Then we set the RESPONSE property to true, this indicates to the synapse engine that this message is in the response direction. And then we call the AggregateSequence. We need to specify the RESPONSE property because the aggregate mediator expects the message flow to be in the response direction.

So, now when we try the proxy it will receive a fault for the element with the invalid value and then redirect to the fault sequence, it will generate that error payload and calls the aggregate. We can see below the error log indicating that it executed the fault sequence and then the final payload:

[2020-02-16 21:55:48,155] [EI-Core]  INFO - LogMediator ON ITERATE FAULT SEQ = IterateFaultHandler

The response payload:

<result>
    <ns:echoIntResponse xmlns:ns="http://echo.services.core.carbon.wso2.org">
        <return>1</return>
    </ns:echoIntResponse>
    <errorProcessing>
        <error>Invalid value "abc" for element in</error>
    </errorProcessing>
</result>

Now, with this approach the aggregate is always executed and the flow finishes without any issues.

You may ask, “Why not call the AggregateSequence inside the Proxy faultSequence?” When we try using the AggregateSequence inside the faultSequence of the proxy it will throw the error below and that does not happen when we use a named sequence for the fault handling:

[2020-02-16 21:59:54,782] [EI-Core] ERROR - SequenceMediator Runtime error occurred while mediating the message
java.util.EmptyStackException
    at java.util.Stack.peek(Stack.java:102)
    at org.apache.synapse.mediators.eip.aggregator.AggregateMediator.mediate(AggregateMediator.java:302)
    at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:109)
    at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:71)
    at org.apache.synapse.mediators.base.SequenceMediator.mediate(SequenceMediator.java:158)
    at org.apache.synapse.mediators.base.SequenceMediator.mediate(SequenceMediator.java:214)
    at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:109)
    at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:71)
    at org.apache.synapse.mediators.base.SequenceMediator.mediate(SequenceMediator.java:158)
    at org.apache.synapse.mediators.MediatorFaultHandler.onFault(MediatorFaultHandler.java:96)
    at org.apache.synapse.FaultHandler.handleFault(FaultHandler.java:53)
    at org.apache.synapse.endpoints.AbstractEndpoint.invokeNextFaultHandler(AbstractEndpoint.java:735)
    at org.apache.synapse.endpoints.AbstractEndpoint.onFault(AbstractEndpoint.java:550)
    at org.apache.synapse.endpoints.AddressEndpoint.onFault(AddressEndpoint.java:46)
    at org.apache.synapse.FaultHandler.handleFault(FaultHandler.java:101)
    at org.apache.synapse.core.axis2.SynapseCallbackReceiver.handleMessage(SynapseCallbackReceiver.java:527)
    at org.apache.synapse.core.axis2.SynapseCallbackReceiver.receive(SynapseCallbackReceiver.java:195)
    at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
    at org.apache.synapse.transport.passthru.ClientWorker.run(ClientWorker.java:265)
    at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

I hope this helps! See you in the next post.

comments powered by Disqus