WSO2 CEP is a lightweight, easy-to-use, open source Complex Event Processing server. Back-end runtime engine of WSO2 CEP is Siddhi. More info on siddhi can be found at[1].
This post explains how to write extensions to siddhi.
An extension allows you to add and use your own custom logic within siddhi queries. Latest released siddhi version supports writing extension to Windows, Transformers, OutputAttributeProcessors and Functions.
I will explain how to write a custom OutputAttributeProcessor with an example.
Writing custom output aggregator
We'll write an output aggregator which will give the unique count of a given attribute. (Siddhi supports count as an inbuillt output aggregator , we'll improve it to get a unique count)
To write an output aggregator,
- write a class implementing org.wso2.siddhi.core.query.selector.attribute.factory.OutputAttributeAggregatorFactory
- write a class extending org.wso2.siddhi.core.query.selector.attribute.handler.OutputAttributeAggregator.
Step 1:
We have to give a namespace and a function name to each extension we write. This have to be included in the class which implements OutputAttributeAggregatorFactory. When implemeting this class, we'll have to override the method createAttributeAggregator. Her what we have to do is return a new instance of the class we created by extending the class OutputAttributeAggregator. (Refer to step 2)
Let's give namespace as “test” and function as “getUniqueCount” for our custom aggregator. Below is our class,
import org.wso2.siddhi.core.query.selector.attribute.factory.OutputAttributeAggregatorFactory;
import org.wso2.siddhi.core.query.selector.attribute.handler.OutputAttributeAggregator;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
@SiddhiExtension(namespace = "test", function = "getUniqueCount")
public class UniqueCountAggregatorFactory implements OutputAttributeAggregatorFactory {
@Override
public OutputAttributeAggregator createAttributeAggregator(Attribute.Type[] types) {
return new UniqueCountAggregatorInt();
}
}
Step 2: Class which extends OutputAttributeAggregator will have the actual logic. When writing this calls we have to implement several methods.
- GetReturnType() : return type of the aggregator. Can be Int, String, Long, Bool, Double, Float
- processAdd(Object obj)
- processRemove(Object obj)
- newInstance()
- destroy() : here we can do the cleanup tasks clode connection if we have cretaed any, etc
import org.wso2.siddhi.core.query.selector.attribute.handler.OutputAttributeAggregator;
import org.wso2.siddhi.query.api.definition.Attribute;
import java.util.ArrayList;
import java.util.List;
public class UniqueCountAggregatorInt implements OutputAttributeAggregator {
private static final long serialVersionUID = 1358997438272544590L;
private int uniqueCount = 0;
private List<Object> valuesCounted = new ArrayList<Object>();
@Override
public Attribute.Type getReturnType() {
return Attribute.Type.INT;
}
@Override
public Object processAdd(Object obj) {
if (!valuesCounted.contains(obj)) {
valuesCounted.add(obj);
uniqueCount++;
}
return uniqueCount;
}
@Override
public Object processRemove(Object obj) {
valuesCounted.remove(obj);
uniqueCount--;
return uniqueCount;
}
@Override
public OutputAttributeAggregator newInstance() {
return new UniqueCountAggregatorInt();
}
@Override
public void destroy() {
}
}
This is all you have to do to create a custom output aggregator. You can refer [2] for more details. We can use this extension on siddhi query as follows,
from StockExchangeStream#window.time(1 hour)
select symbol,test:getUniqueCount(price) as uniquePriceCount
group by symbol
insert into outputStream;
If you want to use the implemented output aggregator extension in WSO2 CEP you haveto follow below steps,
[1]http://srinathsview.blogspot.com/2011/12/siddhi-second-look-at-complex-event.html
[2]https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi
- compile the created two classes
- add the jar files to CEP_HOME/repository/components/lib
- add the fully-qualified class name of class which impelemtnts OutputAttributeAggregatorFactory in a new line to the siddhi.extension file which can be found at <CEP_HOME>/repository/conf/siddhi
[1]http://srinathsview.blogspot.com/2011/12/siddhi-second-look-at-complex-event.html
[2]https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi