>

Monday, October 26, 2015

WSO2 ESB unable to consume messages form JMS Queue


When there are large number of jms consumers in WSO2 ESB, you'll get a warning saying

"WARN {org.apache.axis2.transport.jms.JMSListener} -  Polling tasks on destination : testQueue of type queue for service test_ps have not yet started after 3 seconds ."


And the proxy service will not be able to consume the messages from the queue.

This is because there are not enough threads to consume messages from jms queues.You can increase the number threads used for jms transport  as mentioned below.

  • Create a file  jms.properties in <ESB_HOME> directory
  • Add the following properties to the created file
                      snd_t_core=200
                      snd_t_max=250

Saturday, October 3, 2015

Writing a custom function in Siddhi 3.0

When writing a custom function you simply have to
  1. Extend FunctionExecutor class.
  2. Create the corresponding .siddhiext extension mapping. Name of the extension mapping file should be in the format of <Namespace>.siddhiext. File should contain the function name and the full qualified class name of the extension class.
  3. Compile the class, build the jar containing the .class and .siddhiext files. Add them to the Siddhi class path


My custom function name is plus and namespace is custom.


package sample.query.extension;

public class PlusFunctionExtension extends FunctionExecutor {

    @Override
    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
        for (ExpressionExecutor expressionExecutor : attributeExpressionExecutors) {
            Attribute.Type attributeType = expressionExecutor.getReturnType();
            if (attributeType == Attribute.Type.STRING || attributeType == Attribute.Type.BOOL || attributeType == Attribute.Type.OBJECT) {
               throw new ExecutionPlanCreationException("Plus cannot have parameters with types String or Bool or object");
            }
        }
    }

    @Override
    protected Object execute(Object[] data) {
        double sum = 0;
        for (Object obj:data) {
            sum= sum+((Number) obj).doubleValue();
        }
        return sum;
    }

    @Override
    protected Object execute(Object data) {
        return data;
    }

    @Override
    public void start() {

    }

    @Override
    public void stop() {

    }

    @Override
    public Attribute.Type getReturnType() {
        return Attribute.Type.DOUBLE;
    }

    @Override
    public Object[] currentState() {
        return null;
    }

    @Override
    public void restoreState(Object[] state) {

    }
}

Name of my mapping file  would be custom.siddhiext having the below entry
               plus=sample.query.extension.PlusFunctionExtension

This extension can be referred in siddhi query as follows

from cseEventStream
select symbol, custom:plus(price, tax) as total
insert into outputStream;

Now you have seen a sample function extension I'll  briefly explain the methods that have to be implemented when creating a custom function.

init : attributeExpressionExecutors related to the parameters of the function is passed here. In the above function, attributeExpressionExecutors related to price and tax attributes are passed to the init method.

execute : this method contains the actual functionality. Values of the attributes passed in the query are be passed as an object array.

start and stop : these methods are not necessary for this function. If you want to create any connections such as database connections  you may need the start method and the stop can be used if you want to close the connection.