How to write your own BusinessEvents Channels
Simply put, TIBCO BusinessEvents provides the ability for customers to develop their own custom channels using Java APIs. It’s not too difficult, an old Java hack like myself has implemented his first one in a few hours.
What’s a Channel? A Channel is TIBCO BusinessEvents (we’ll call it BE from henceforth) way of connecting to the outside world, usually some Message Broker technology such as Apache Kafka or JMS.
BE provides API extensions via a bundle of interfaces and classes bundled in a package named com.tibco.be.custom.channel
Creating your own channel is relatively simple to do by implementing and extending a number of classes provided by the custom channel framework.
In the box, BE does provide a working Kafka channel with source code that showcases these APIs and is located at BE_HOME/api/chanel-api/examples/kafka/src
Channel Lifecycle
Channels follow a simple lifecycle. All channels follow the below sequence that initiate and start the communication of events with the outside world:
All of your channel handling logic will be scattered through these methods so it’s important to understand what each method does.
Rather than briefly discuss each method, I have also provided code extracts to a working channel implementation i’ve written that integrates BE to AWS Simple Queue Service so that you can see in detail the logic required in implementing such a custom channel.
Channel.init() is the entry point to starting the channel lifecycle. Its duty is to perform any initialisation logic that is needed by your channel implementation. It is here that you can either choose to call the super.init() that will call each Destinations init() method for you or you manually implement your own logic to do so.
/**
* We have chosen to override init() It will initialize a common threadpool for all SQS Destinations to use, to start their consumers
*/
@Override
public void init() throws Exception {
executor = Executors.newCachedThreadPool();
ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
pool.setCorePoolSize(2);
//Call super.init here, it in turn invokes each of the destinations init methods. This gives a chance to each of the destinations to initialize itself.
super.init();
}
Destination.init() is used to perform any initialisation logic per each defined Destination within your Channel configuration.
public void init() throws Exception {
logger.log(Level.DEBUG,"Initialising SQS Destination");
executor = ((SqsChannel) getChannel()).getJobPool();
try {
threads = Integer.parseInt(getDestinationProperties().getProperty(CONFIG_THREADS));
} catch (Exception e) {
logger.log(Level.ERROR,e,"Unable to parse Consumer Threads for destination");
e.printStackTrace();
}
queueUrl = getDestinationProperties().getProperty(CONFIG_QUEUE_URL);
try {
pollInterval = Integer.parseInt(getDestinationProperties().getProperty(CONFIG_POLL_INTERVAL));
} catch (Exception e) {
logger.log(Level.ERROR,e,"Unable to parse Poll Interval for destination");
e.printStackTrace();
}
try {
maxMessages = Integer.parseInt(getDestinationProperties().getProperty(CONFIG_MAX_MESSAGES));
} catch (Exception e) {
logger.log(Level.ERROR,e,"Unable to parse Max Messages for destination");
e.printStackTrace();
}
logger.log(Level.DEBUG,"Initialisation of SQS Destination completed");
}
Channel.connect() is the entry point to establish connection-orientated transports such as Kafka or JMS. By default the BaseChannel class implementation will call your Destination.connect() method that you implement.
Destination.connect() is used to perform any connection-oriented logic such as establishing client connections and sessions with your message broker per destination. If you are implementing connection-less logic you can leave this method empty.
public void connect() throws Exception {
logger.log(Level.DEBUG,"Connecting to AWS SQS");
AwsCredentialsProvider awsCredentialsProvider = createCredsProviderWithRole();
sqsClient = SqsClient.builder().credentialsProvider(awsCredentialsProvider)
.region(Region.of(getChannel().getChannelProperties().getProperty(CONFIG_AWS_REGION)))
.build();
logger.log(Level.DEBUG,"Successfully connected to AWS SQS");
}
Destination.bind() is used to start and bind your Message Listeners. Your Message Listeners are implemented as runnable and are tasked with handling logic to receive external messages using whatever connections you’ve setup within your Destination. It is in the bind() method that you should perform instantiation of Listener instance. Each Listener instantiation is added to a listeners collection.
private List<SqsListener> listeners = new ArrayList<SqsListener>();public void bind(EventProcessor eventProcessor) throws Exception {
//Create consumer(s) for received EventProcessor, don't start polling yet
logger.log(Level.DEBUG,"Binding Message Receivers to Listener threads");
for (int i = 0; i < threads; i++) {
SqsListener listener = new SqsListener(sqsClient, queueUrl, maxMessages, pollInterval, i, eventProcessor, getSerializer(), getLogger());
listeners.add(listener);
}
logger.log(Level.DEBUG,"Completed binding Message Receivers to Listener threads");
}
Listener.run() does the your ingress ‘heavy-lifting’ i.e. it’s where you’ll either asynchronously block waiting or poll your external system connection for messages.
public void run() {
logger.log(Level.DEBUG,"Listener thread running");
while(true) {
logger.log(Level.DEBUG,"Listener waiting for SQS message");
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(1)
.waitTimeSeconds(pollingInterval)
.build();
List<Message> messages = sqsClient.receiveMessage(receiveMessageRequest).messages();
Event event = null;
for (Message message : messages) {
try {
event = serializer.deserializeUserEvent(message,null);
} catch(Exception e) {
logger.log(Level.ERROR,"SqsListener : Exception occurred while deserializing message : " +e );
}
if (event != null) {
try {
logger.log(Level.DEBUG,"Dispatching message to Event Processor");
eventProcessor.processEvent(event);
logger.log(Level.DEBUG,"Dispatch completed");
} catch(final Exception e) {
logger.log(Level.ERROR, e, "SqsListener : Exception occurred while processing event : "+ e);
}
}
logger.log(Level.DEBUG,"Deleting SQS message");
DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(message.receiptHandle())
.build();
try {
sqsClient.deleteMessage(deleteMessageRequest);
} catch (Exception e) {
logger.log(Level.ERROR, e, "Unable to delete message from SQS");
e.printStackTrace();
}
logger.log(Level.DEBUG,"SQS message deleted");
}
}
}
In the Listener.run() method we make use of the deserializeUserEvent() method to perform deserialisation logic from our external message representation to BE’s CustomEvent object model:
public Event deserializeUserEvent(Object message, Map<String, Object> properties) throws Exception {
logger.log(Level.DEBUG,"De-serializing SQS Message");
Event event = new CustomEvent();
if(message instanceof Message) {
Message m = (Message) message;
String payload = m.body();
String extId = m.messageId();
event.setPayload((payload != null) ? payload.getBytes() : new byte[0]);
event.setExtId(extId);
} else {
return null;
}
return event;
}
Conversely, we need to be able to serialise our BE Event object to our outbound Message object:
public Object serializeUserEvent(EventWithId event, Map<String, Object> properties) throws Exception {
logger.log(Level.DEBUG,"Serializing SQS Message");
Message message = Message.builder()
.messageId(event.getExtId())
.body(new String(event.getPayload()))
.build();
return message;
}
Channel.start() is the entry point to get things moving. By default the Base class implementation calls yours Destination.start() method. This is where you implement any logic to start your listener threads.
public void start() throws Exception {
logger.log(Level.DEBUG,"Starting Listeners");
for(final SqsListener listener : listeners) {
executor.submit(listener);
}
logger.log(Level.DEBUG,"Listeners started");
}
Channel.suspend() is the entry point needed to suspend your channel. By default the Base class implementation calls your Destination.suspend() method.
Channel.resume() is the entry point needed to resume your suspended channel. By default the Base class implementation calls your Destination.resume() method.
Channel.close() is the entry point needed to close your connection. By default the Base class implementation calls your Destination.close() method.
Destination.close() is used to perform any connection-oriented logic to close your destinations connection to the external service e.g. closing sessions and client handles.
public void close() throws Exception {
logger.log(Level.DEBUG,"Closing SQS Client Connection");
sqsClient.close();
logger.log(Level.DEBUG,"SQS Client Connection closed");
}
Destination.send() is used to perform another ‘heavy-lift’ operation of event egress to your chosen external system. It is invoked by Base class implementation every time a Simple Event is published. We make use of the serializeUserEvent() method to serialise our EventWithId object to external format we desire, in this instance it’s a sqs.model.Message object.
public void send(EventWithId event, Map map) throws Exception {
final Message message = (Message) getSerializer().serializeUserEvent(event,null);
SendMessageRequest sendMessageRequest = SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(new String(event.getPayload()))
.build();
logger.log(Level.DEBUG, "Sending SQS msg %s", event.getPayload());
sqsClient.sendMessage(sendMessageRequest);
logger.log(Level.DEBUG, "Sent SQS msg %s", event.getPayload());
}
Handling UI configuration
TIBCO BusinessEvents Studio automagically creates the design-time UI form that allows you to configure your new custom channel.
All we need to do is provide a simple xml definition file called drivers.xml
<?xml version="1.0" encoding="UTF-8"?>
<drivers>
<driver>
<type>AWS-SQS</type>
<label>AWS-SQS</label>
<class>com.tibco.be.custom.channel.aws.sqs.SqsDriver</class>
<description>AWS SQS Channel</description>
<version>1.0.0.0</version>
<properties>
<property name="aws.region" displayName="AWS Region" type="String" default="eu-west-1" mandatory="true" gvToggle="true"/>
<property name="aws.sqs.access.key" displayName="AWS SQS Access Key" type="String" default="" mandatory="true" gvToggle="true"/>
<property name="aws.sqs.secret.key" displayName="AWS SQS Secret Key" type="String" default="" mandatory="true" gvToggle="true"/>
<property name="aws.sqs.role.arn" displayName="AWS SQS Role ARN" type="String" default="" mandatory="true" gvToggle="true"/>
</properties>
<destinations>
<property name="queue.url" displayName="Queue URL" type="String" default="" mandatory="true" gvToggle="true"/>
<property name="consumer.threads" displayName="Consumer Threads" type="Integer" default="1" mandatory="false" gvToggle="true"/>
<property name="poll.interval" displayName="Poll Interval(sec)" type="Integer" default="30" mandatory="false" gvToggle="true"/>
<property name="max.messages" displayName="Maximum number messages per poll interval" type="Integer" default="1" mandatory="false" gvToggle="true"/>
</destinations>
<serializers userdefined="true">
<serializer type="String" class="com.tibco.be.custom.channel.aws.sqs.serializer.SqsTextSerializer"/>
</serializers>
</driver>
</drivers>
In the drivers.xml file we provide named <property> elements within <properties> for channel level configuration. Within <destination> additional <property> elements are also defined.
Access to the configuration programmatically is via getchannel().getChannelProperties().getProperty() & getDestinationProperties().getProperty() methods
public static final String CONFIG_AWS_SQS_SECRET_KEY = "aws.sqs.secret.key";
public static final String CONFIG_QUEUE_URL = "queue.url";getChannel().getChannelProperties().getProperty(CONFIG_AWS_SQS_ACCESS_KEY)
getDestinationProperties().getProperty(CONFIG_QUEUE_URL);
Deploying your custom channel jar
This is simple, just copy your jar to BE_HOME/lib/ext/tpcl directory and BE will load your channel on engine startup.
So let’s see it running…
No smoke-and-mirrors here; I spent around a working day getting my AWS SQS Channel developed with some of my time trying to figure out what AWS dependencies I needed for it to work, partly due to misleading AWS docs that where out of step with their SDK release.
I also spent a fare bit of time getting federated login to work using the STS APIs, again all the examples in the AWS docs covered SDKv1 APIs and I was using SDKv2 API for SQS integration. If you’re having to deal with authorization issues due Role authz, you’ll be pleased to know createCredsProviderWithRole() method in my code handles this tricky bit of authz logic for you:
private AwsCredentialsProvider createCredsProviderWithRole() throws ExecutionException, InterruptedException {
logger.log(Level.DEBUG,"Establishing AWS Credentials");
AwsCredentialsProvider credsProvider = StaticCredentialsProvider.create(
AwsBasicCredentials.create(
getChannel().getChannelProperties().getProperty(CONFIG_AWS_SQS_ACCESS_KEY),
getChannel().getChannelProperties().getProperty(CONFIG_AWS_SQS_SECRET_KEY)));
StsClient stsClient = StsClient.builder().credentialsProvider(credsProvider).build();
AssumeRoleRequest request = AssumeRoleRequest.builder()
.durationSeconds(3600)
.roleArn(getChannel().getChannelProperties().getProperty(CONFIG_AWS_ROLE_ARN))
.roleSessionName("be")
.build();
StsAssumeRoleCredentialsProvider response = StsAssumeRoleCredentialsProvider
.builder()
.stsClient(stsClient)
.refreshRequest(request)
.build();
AwsSessionCredentials creds = (AwsSessionCredentials) response.resolveCredentials();
AwsSessionCredentials sessionCredentials = AwsSessionCredentials.create(creds.accessKeyId(), creds.secretAccessKey(), creds.sessionToken());
logger.log(Level.DEBUG,"Credentials established");
return AwsCredentialsProviderChain.builder()
.credentialsProviders(StaticCredentialsProvider.create(sessionCredentials))
.build();
}
So, in all the actual time writing the channel was less than a couple of hours effort.
2020 Dec 15 11:47:02.206 GMT Z localhost DEBUG [main] - [custom.channel] Connecting to AWS SQS
2020 Dec 15 11:47:02.207 GMT Z localhost DEBUG [main] - [custom.channel] Establishing AWS Credentials
2020 Dec 15 11:47:03.051 GMT Z localhost DEBUG [main] - [custom.channel] Credentials established
2020 Dec 15 11:47:03.095 GMT Z localhost DEBUG [main] - [custom.channel] Successfully connected to AWS SQS
2020 Dec 15 11:47:03.095 GMT Z localhost INFO [main] - [custom.channel] Channel Connected : /Channels/SQS
2020 Dec 15 11:47:03.096 GMT Z localhost INFO [main] - [custom.channel] Channel:Connected[/Channels/SQS]
2020 Dec 15 11:47:03.096 GMT Z localhost DEBUG [main] - [runtime.session] inference-class binding input channel /Channels/SQS/NewDestination_0 to destination /Channels/SQS
2020 Dec 15 11:47:03.097 GMT Z localhost DEBUG [main] - [custom.channel] Binding Message Receivers to Listener threads
2020 Dec 15 11:47:03.097 GMT Z localhost DEBUG [main] - [custom.channel] Completed binding Message Receivers to Listener threads
2020 Dec 15 11:47:03.097 GMT Z localhost INFO [main] - [custom.channel] Destination:Bind[/Channels/SQS/NewDestination_0]
2020 Dec 15 11:47:03.098 GMT Z localhost DEBUG [main] - [custom.channel] Starting Listeners
2020 Dec 15 11:47:03.098 GMT Z localhost DEBUG [main] - [custom.channel] Listeners started
2020 Dec 15 11:47:03.098 GMT Z localhost INFO [main] - [custom.channel] Channel Started : /Channels/SQS
2020 Dec 15 11:47:03.099 GMT Z localhost INFO [main] - [driver.local] Destination [/Channels/Internal/Dummy] Properties [Size=0 TimeOut=-1]
2020 Dec 15 11:47:03.099 GMT Z localhost INFO [main] - [driver.local] Started Channel [/Channels/Internal]
2020 Dec 15 11:47:03.102 GMT Z localhost DEBUG [pool-3-thread-1] - [custom.channel] Listener thread running
2020 Dec 15 11:47:03.102 GMT Z localhost DEBUG [pool-3-thread-2] - [custom.channel] Listener thread running
2020 Dec 15 11:47:03.102 GMT Z localhost DEBUG [pool-3-thread-2] - [custom.channel] Listener waiting for SQS message
2020 Dec 15 11:47:03.102 GMT Z localhost DEBUG [pool-3-thread-1] - [custom.channel] Listener waiting for SQS message
2020 Dec 15 11:47:03.103 GMT Z localhost DEBUG [main] - [runtime.scheduler] [inference-class] Resuming threadpool [%clusterName%:33050@mmussett-mbpro.local:session-inference-class:602142157:AsyncWorkerService:$default.be.mt$]
2020 Dec 15 11:47:03.103 GMT Z localhost DEBUG [main] - [runtime.scheduler] [inference-class] Resumed threadpool [%clusterName%:33050@mmussett-mbpro.local:session-inference-class:602142157:AsyncWorkerService:$default.be.mt$]
2020 Dec 15 11:47:03.108 GMT Z localhost INFO [main] - [user] [inference-class] Startup
2020 Dec 15 11:47:03.111 GMT Z localhost DEBUG [main] - [kernel.core] [inference-class] RTC Operations: [Object=AdvisoryEvent@id=2, RtcOps=101]
2020 Dec 15 11:47:03.112 GMT Z localhost INFO [main] - [custom.channel] Destination Resumed : /Channels/SQS/NewDestination_0
2020 Dec 15 11:47:03.113 GMT Z localhost INFO [main] - [studio.core] Hot Deployment disabled
2020 Dec 15 11:47:03.120 GMT Z localhost INFO [main] - [runtime.service] Registering all BE-Engine level Group MBeans...
2020 Dec 15 11:47:03.124 GMT Z localhost INFO [main] - [runtime.service] All BE-Engine level Group MBeans SUCCESSFULLY registered
2020 Dec 15 11:47:03.124 GMT Z localhost OFF [main] - [runtime.session] BE Engine localhost started
2020 Dec 15 11:47:03.125 GMT Z localhost INFO [main] - [container.standalone] Starting debugger service...
2020 Dec 15 11:47:08.962 GMT Z localhost DEBUG [pool-3-thread-2] - [custom.channel] De-serializing SQS Message
2020 Dec 15 11:47:08.963 GMT Z localhost DEBUG [pool-3-thread-2] - [custom.channel] Dispatching message to Event Processor
2020 Dec 15 11:47:08.965 GMT Z localhost DEBUG [pool-3-thread-2] - [custom.channel] Dispatch completed
2020 Dec 15 11:47:08.965 GMT Z localhost DEBUG [pool-3-thread-2] - [custom.channel] Deleting SQS message
2020 Dec 15 11:47:08.966 GMT Z localhost INFO [$default.be.mt$.Thread.1] - [user] [inference-class] 1d4349b4-c29a-4f00-ac8c-b22fb75043d9
2020 Dec 15 11:47:08.967 GMT Z localhost INFO [$default.be.mt$.Thread.1] - [user] [inference-class] { "payload" : { "msg": "hello"}}
Source code
Mosey on down to my Github repository to access the code for this tutorial & drop me a comment if you want to see more custom channel contributions created for your BE6 projects!