>

Saturday, December 5, 2015

WSO2 ESB - Using File Inbound endpoint

With WSO2 ESB 4.9.0 you can use different Inbound endpoints. ESB has 3 types of inbound endpoints;
  • Listening Inbound Endpoints
  • Polling Inbound Endpoints
  • Event-Based Inbound Endpoints

In this blog post I'll explain how to create a file inbound endpoint.
File Inbound endpoint falls under Polling Inbound Endpoints. 

  1. Start ESB server.
  2. Go to Inbound Endpoints.
  3. Provide a name and select type as File. Click next.
  4. Provide and save the details here.


Sequence is the name of the sequence which should be called once a file is read from the location specified in transport.vfs. FileURI. You can read more on vfs parameters from [1]

Sample inbound endpoint configuration looks like follows,

<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
name="vfsEndpoint"
sequence="inboundtest"
onError="fault"
protocol="file"
suspend="false">
<parameters>
<parameter name="interval">60</parameter>
<parameter name="transport.vfs.ActionAfterErrors">NONE</parameter>
<parameter name="coordination">true</parameter>
<parameter name="transport.vfs.ContentType">application/xml</parameter>
<parameter name="transport.vfs.LockReleaseSameNode">false</parameter>
<parameter name="transport.vfs.AutoLockRelease">true</parameter>
<parameter name="transport.vfs.ActionAfterFailure">MOVE</parameter>
<parameter name="transport.vfs.AutoLockReleaseInterval">5000</parameter>
<parameter name="transport.vfs.CreateFolder">true</parameter>
<parameter name="sequential">true</parameter>
<parameter name="transport.vfs.ActionAfterProcess">MOVE</parameter>
<parameter name="transport.vfs.FileURI">file:///home/sachini/dev/inbooundtest</parameter>
<parameter name="transport.vfs.MoveAfterFailure">file:///home/sachini/dev/inbooundtest/error</parameter>
<parameter name="transport.vfs.DistributedLock">false</parameter>
<parameter name="transport.vfs.FileNamePattern">.*.xml</parameter>
<parameter name="transport.vfs.MoveAfterProcess">file:///home/sachini/dev/inbooundtest/out</parameter>
<parameter name="transport.vfs.Locking">enable</parameter>
<parameter name="transport.vfs.FileSortAscending">true</parameter>
<parameter name="transport.vfs.FileSortAttribute">NONE</parameter>
</parameters>
</inboundEndpoint>



Once you add the inbound endpoint and put a file (in this case a xml file) in the location specified in transport.vfs.FileURI. File will be read and the specified sequence will be called. 






Monday, October 26, 2015

WSO2 ESB unable to consume messages form JMS Queue


When there are large number of jms consumers in WSO2 ESB, you'll get a warning saying

"WARN {org.apache.axis2.transport.jms.JMSListener} -  Polling tasks on destination : testQueue of type queue for service test_ps have not yet started after 3 seconds ."


And the proxy service will not be able to consume the messages from the queue.

This is because there are not enough threads to consume messages from jms queues.You can increase the number threads used for jms transport  as mentioned below.

  • Create a file  jms.properties in <ESB_HOME> directory
  • Add the following properties to the created file
                      snd_t_core=200
                      snd_t_max=250

Saturday, October 3, 2015

Writing a custom function in Siddhi 3.0

When writing a custom function you simply have to
  1. Extend FunctionExecutor class.
  2. Create the corresponding .siddhiext extension mapping. Name of the extension mapping file should be in the format of <Namespace>.siddhiext. File should contain the function name and the full qualified class name of the extension class.
  3. Compile the class, build the jar containing the .class and .siddhiext files. Add them to the Siddhi class path


My custom function name is plus and namespace is custom.


package sample.query.extension;

public class PlusFunctionExtension extends FunctionExecutor {

    @Override
    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
        for (ExpressionExecutor expressionExecutor : attributeExpressionExecutors) {
            Attribute.Type attributeType = expressionExecutor.getReturnType();
            if (attributeType == Attribute.Type.STRING || attributeType == Attribute.Type.BOOL || attributeType == Attribute.Type.OBJECT) {
               throw new ExecutionPlanCreationException("Plus cannot have parameters with types String or Bool or object");
            }
        }
    }

    @Override
    protected Object execute(Object[] data) {
        double sum = 0;
        for (Object obj:data) {
            sum= sum+((Number) obj).doubleValue();
        }
        return sum;
    }

    @Override
    protected Object execute(Object data) {
        return data;
    }

    @Override
    public void start() {

    }

    @Override
    public void stop() {

    }

    @Override
    public Attribute.Type getReturnType() {
        return Attribute.Type.DOUBLE;
    }

    @Override
    public Object[] currentState() {
        return null;
    }

    @Override
    public void restoreState(Object[] state) {

    }
}

Name of my mapping file  would be custom.siddhiext having the below entry
               plus=sample.query.extension.PlusFunctionExtension

This extension can be referred in siddhi query as follows

from cseEventStream
select symbol, custom:plus(price, tax) as total
insert into outputStream;

Now you have seen a sample function extension I'll  briefly explain the methods that have to be implemented when creating a custom function.

init : attributeExpressionExecutors related to the parameters of the function is passed here. In the above function, attributeExpressionExecutors related to price and tax attributes are passed to the init method.

execute : this method contains the actual functionality. Values of the attributes passed in the query are be passed as an object array.

start and stop : these methods are not necessary for this function. If you want to create any connections such as database connections  you may need the start method and the stop can be used if you want to close the connection.

Wednesday, September 30, 2015

Extending Siddhi 3.0


Siddhi[1] 3.0.0 is released now with lots of new features and improved performance. Some of the main features supported by siddhi are,
  • Filter 
  • Join 
  • Aggregation 
  • Stream handlers (e.g:- WIndows) 
  • Pattern processing 
  • Sequence processing 
  • Event Tables

In addition to above, Siddhi facilitates writing various types of extensions via its plugable architecture. Siddhi is shipped with in-built set of extensions including,

Not only these extension we can write our own 
  1. aggregators
  2. functions
  3. windows
  4. stream functions 
  5. stream processors 
easily which can be used with siddhi queries. It's simply a matter of extending a siddhi interface and adding the mapping file to map the extension class with its function name and namespace.

Check my next blog posts for detailed info on writing extensions for each of the five types.



You can read more about extensions from [2] and [3].


[1]https://github.com/wso2/siddhi
[2]https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-SiddhiExtensions
[3]https://docs.wso2.com/display/CEP400/Writing+Extensions+to+Siddhi

Friday, May 15, 2015

Using sonar to analyze code quality of maven projects

  • Download sonar
  • Start SonarQube: In my case I started sonaQube by running ./sonar.sh start from the directory sonarqube-4.4/bin/linux-x86-64
  • Go to your project directory and run the command mvn sonar:sonar
  • You can view the results from http://localhost:9000/

Tuesday, March 10, 2015

WSO2 ESB - Enrich Mediator

The Enrich Mediator can process a message based on a given source configuration and then perform the specified action on the message by using the target configuration. It is often used to do slight modifications to the payload.

This post explains some scenarios where Enrich mediator can be used.

Original payload

<orders>
<order>
<price>50.00</price>
<quantity>500</quantity>
<symbol>IBM</symbol>
<comment>REF 10053</comment>
</order>
<order>
<price>18.00</price>
<quantity>500</quantity>
<symbol>MSFT</symbol>
<comment>ref 20088398289</comment>
</order>
</orders>

Required payload-1

<orders>
<order>
<price>50.00</price>
<quantity>500</quantity>
<symbol>IBM</symbol>
<comment>REF 10053</comment>
</order>
<order>
<price>18.00</price>
<quantity>500</quantity>
<symbol>MSFT</symbol>
<comment>ref 20088398289</comment>
</order>
</orders>
<orderID>2</OrderID>

mediator

<property name="orderID" scope="default" description="orderID">
            <orderID xmlns="">2</orderID>
 </property>
  <enrich>
            <source clone="true" xpath="$ctx:orderID"/>
            <target action="sibling" xpath="//orders"/>
  </enrich>

Define a property <orderID>2</orderID>
Then add the property as a sibling of <orders>


Required payload-2

<orders>
<order>
<price>50.00</price>
<quantity>500</quantity>
<symbol>IBM</symbol>
<comment>REF 10053</comment>
</order>
<order>
<price>18.00</price>
<quantity>500</quantity>
<symbol>MSFT</symbol>
<comment>ref 20088398289</comment>
</order>
<orderID>2</OrderID>
</orders>

mediator

<property name="orderID" scope="default" description="orderID">
            <orderID xmlns="">2</orderID>
 </property>
   <enrich>
            <source clone="true" xpath="$ctx:orderID"/>
            <target action="child" xpath="//orders"/>
         </enrich>

Define a property <orderID>2</orderID>
Then add the property as a child of <orders>


Required payload-3

<orders>
<order>
<price>50.00</price>
<quantity>500</quantity>
<symbol>IBM</symbol>
<comment>REF 10053</comment>
<volume>100<volume>
</order>
<order>
<price>18.00</price>
<quantity>500</quantity>
<symbol>MSFT</symbol>
<comment>ref 20088398289</comment>
</order>
<orderID>2</OrderID>
</orders>

mediator

<property name="VOLUME" value="100"/>
 <enrich>
           <source type="inline" clone="true">
                   <volume xmlns=""/>
            </source>
            <target action="child" xpath="//orders/order"/>
 </enrich>
 <enrich>
            <source type="property" clone="true" property="VOLUME"/>
            <target xpath="//orders/order/volume"/>
 </enrich>

Using first enrich mediator we add the element <volume>  as a child of <order>
Second enrich mediator adds the value of “VOLUME” property to the newly created element



Required payload-4

<orders>
<order>
<price>50.00</price>
<quantity>500</quantity>
<symbol>IBM</symbol>
<comment>REF 10053</comment>
<volume>100<volume>
</order>
<order>
<price>18.00</price>
<quantity>500</quantity>
<symbol>MSFT</symbol>
<volume>100</volume>
<comment>ref 20088398289</comment>
</order>
<orderID>2</OrderID>
</orders>

mediator

 <property name="VOLUME" value="100"/>
 <enrich>
            <source type="inline" clone="true">
               <volume xmlns=""/>
            </source>
            <target action="child" xpath="//orders/order[symbol/text() = 'MSFT']"/>
  </enrich>
  <enrich>
            <source type="property" clone="true" property="VOLUME"/>
            <target xpath="//orders/order[symbol/text() = 'MSFT']/volume"/>
  </enrich>


Instead of comparing 'MSFT' twice,  we can use a property to store the value 'MSFT' and then use it for comparisons as below.
 <property name="VOLUME" value="100"/>
         <property name="SYMBOL" value="MSFT"/>
         <enrich>
            <source type="inline" clone="true">
               <volume xmlns=""/>
            </source>
            <target action="child"
                    xpath="//orders/order[symbol/text() = get-property('SYMBOL')]"/>
         </enrich>
         <enrich>
            <source type="property" clone="true" property="VOLUME"/>
            <target xpath="//orders/order[symbol/text() = get-property('SYMBOL')]/volume"/>
         </enrich>

Monday, December 29, 2014

Writing Siddhi Extensions



WSO2 CEP is a lightweight, easy-to-use, open source Complex Event Processing server. Back-end runtime engine of WSO2 CEP is Siddhi. More info on siddhi can be found at[1].

This post explains how to write extensions to siddhi.

An extension allows you to add and use your own custom logic within siddhi queries. Latest released siddhi version supports writing extension to Windows, Transformers, OutputAttributeProcessors and Functions.

I will explain how to write a custom OutputAttributeProcessor with an example.

Writing custom output aggregator

We'll write an output aggregator which will give the unique count of a given attribute. (Siddhi supports count as an inbuillt output aggregator , we'll improve it to get a unique count)

To write an output aggregator,
  • write a class implementing org.wso2.siddhi.core.query.selector.attribute.factory.OutputAttributeAggregatorFactory 
  • write a class extending org.wso2.siddhi.core.query.selector.attribute.handler.OutputAttributeAggregator. 

Step 1:

We have to give a namespace and a function name to each extension we write. This have to be included in the class which implements OutputAttributeAggregatorFactory. When implemeting this class, we'll have to override the method createAttributeAggregator. Her what we have to do is return a new instance of the class we created by extending the class OutputAttributeAggregator. (Refer to step 2)

Let's give namespace as “test” and function as “getUniqueCount” for our custom aggregator. Below is our class,

import org.wso2.siddhi.core.query.selector.attribute.factory.OutputAttributeAggregatorFactory; 
import org.wso2.siddhi.core.query.selector.attribute.handler.OutputAttributeAggregator; 
import org.wso2.siddhi.query.api.definition.Attribute; 
import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; 


    @SiddhiExtension(namespace = "test", function = "getUniqueCount") 
    public class UniqueCountAggregatorFactory implements OutputAttributeAggregatorFactory {
 
        @Override 
        public OutputAttributeAggregator createAttributeAggregator(Attribute.Type[] types) { 
            return new UniqueCountAggregatorInt(); 
        } 
    }
Step 2:

Class which extends OutputAttributeAggregator will have the actual logic. When writing this calls we have to implement several methods.

  • GetReturnType() : return type of the aggregator. Can be Int, String, Long, Bool, Double, Float 
  • processAdd(Object obj) 
  • processRemove(Object obj) 
  • newInstance() 
  • destroy() : here we can do the cleanup tasks clode connection if we have cretaed any, etc 

import org.wso2.siddhi.core.query.selector.attribute.handler.OutputAttributeAggregator; 
import org.wso2.siddhi.query.api.definition.Attribute; 

import java.util.ArrayList; 
import java.util.List; 


public class UniqueCountAggregatorInt implements OutputAttributeAggregator { 
    private static final long serialVersionUID = 1358997438272544590L; 
    private int uniqueCount = 0; 
    private List<Object> valuesCounted = new ArrayList<Object>(); 

    @Override 
    public Attribute.Type getReturnType() { 
        return Attribute.Type.INT; 
    } 


    @Override 
    public Object processAdd(Object obj) { 
        if (!valuesCounted.contains(obj)) { 
            valuesCounted.add(obj); 
            uniqueCount++; 
        } 
        return uniqueCount; 
    } 


    @Override 
    public Object processRemove(Object obj) { 
        valuesCounted.remove(obj); 
        uniqueCount--; 

        return uniqueCount; 
    } 


    @Override 
    public OutputAttributeAggregator newInstance() { 
        return new UniqueCountAggregatorInt(); 
    } 


    @Override 
    public void destroy() { 
    } 
}
This is all you have to do to create a custom output aggregator. You can refer [2] for more details.
We can use this extension on siddhi query as follows,

         from StockExchangeStream#window.time(1 hour)
         select symbol,test:getUniqueCount(price) as uniquePriceCount
         group by symbol
         insert into outputS
tream;

If you want to use the implemented output aggregator extension in WSO2 CEP you haveto follow below steps,

  • compile the created two classes 
  • add the jar files to CEP_HOME/repository/components/lib 
  • add the fully-qualified class name of class which impelemtnts OutputAttributeAggregatorFactory in a new line to the siddhi.extension file which can be found at <CEP_HOME>/repository/conf/siddhi 


[1]http://srinathsview.blogspot.com/2011/12/siddhi-second-look-at-complex-event.html
[2]https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi