Project Flogo — How to develop Flogo Activity Extensions

mmussett
9 min readMay 11, 2020

What’s Project Flogo?

Project Flogo is an ultra-light, Go-based open source ecosystem for building event-driven apps. Event-driven, you say? Yup, the notion of triggers and actions are leveraged to process incoming events. An action, a common interface, exposes key capabilities such as application integration, stream processing, etc.

  • App = Trigger(s) + Actions[&Activities]
  • Triggers — receive data from external sources, are managed by a configurable threading model, and have a common interface enabling anyone to build a Flogo trigger.
  • Handlers — dispatch events to actions
  • Actions — process events in a manner suitable with the implementation and have a common interface enabling opinionated event processing capabilities

All capabilities within the Flogo Ecosystem have a few things in common, they all process events (in a manner suitable for the specific purpose) and they all implement the action interface exposed by Flogo Core.

Why should I use Flogo?

Flogo is a development tool that quickly allows you to build lightweight Golang based application with minimal fuss.

Some of the key highlights include:

  • Ultra-light 20x-50x lighter than Java or Node.js
  • Event-driven Powerful event-driven programming model based on triggers and actions
  • Common core a single, common core enables reuse and flexibility across all eventing constructs
  • Golang based Written entirely in Golang for efficiency (ok, i said that already)
  • Deployment flexibility Deploy as ultra-lightweight serverless functions, containers or static binaries on IoT edge devices
  • Native machine learning Purpose built activity for TensorFlow SavedModel inferencing
  • 100% Open Source for your dev & hacking pleasure

Did I tell you it’s also written in Golang :-)

So what else does Flogo do?

Well apart from being written in Golang (ok, i’ll stop it). Flogo was designed to solve some interesting use-cases such as:

  • Integration Flows Application Integration process engine with conditional branching and a visual development environment
  • Stream Processing a simple pipeline-based stream processing action with event joining capabilities across multiple triggers & aggregation over time windows
  • Contextual Decisioning Declarative Rules for Real-time Contextual Decisions
  • Microgateway Microgateway pattern for conditional, content-based routing, JWT validation, rate limiting, circuit breaking and other common patterns

The concept is simple, an event is just that, an event, how it’s processed is what differs. Flogo Core eases the burden by enabling a common set of functionality, such as:

  • threading
  • logging
  • data type coercion
  • data mapping
  • tracing & monitoring hooks

While also exposing a common set of contributions via activities and triggers. For example, all available triggers can be leveraged to dispatch events to any action implementation, that is, flows for application integration, streams for stream processing, rules for contextual rule processing, etc.

Flogo Core

Flogo Core is an event-driven app framework used to develop apps for the cloud & IoT edge. It can also be thought of as a lightweight app kernel used by open source & commercial solutions.

Flogo Core provides the following key benefits:

  • Action chaining enables communication between one or more capabilities in a single, sub 10MB binary!
  • Common contribution model build activities and triggers that can be leveraged by all capabilities
  • Extensible easily extend the capabilities available by building your own action using the common interfaces

Flogo Core Contribution Model

Flogo Core exposes three principal contribution interfaces that enable developers to build common capabilities and functionality. These contribution interfaces include:

  • Trigger Interface a common interface for building event-consumers that dispatch events to one or more actions. The Kafka subscriber is an example of a trigger.
  • Activity Interface a common interface for exposing common application logic in a reusable manner. Think of this as a function, such as write to database, publish to Kafka, etc that can be used by all Flogo apps.
  • Action Interface a common interface for processing events. Actions contain the specific capability logic, such as integration, stream processing, rule processing, etc. Actions have a great deal of flexibility in how they’re developed and how developers leverage actions within their overall applications. For example, flows and streams expose JSON-based DSLs & Go APIs for maximum developer flexibility.

And?!?

The propeller head engineers over in TIBCO decided that the world needed another cute mascot too….

Meet Flogo Flynn…

He’s the lovable mascot for Project Flogo and I want to write about how easy it is to create extensions to the Flogo platform so that you can too!

OK, show me the money!

Got you hooked yet? Want to learn more? Then read on…

Lets’ play…

For the purpose of learning something new and cool, i’m going to use a proper integration requirement of sending data to Apache Pulsar message broker. All the stuff we’re using here is open-source.

Apache Pulsar for those not in the know was created by Yahoo as a pub-sub messaging system and now part of Apache Software Foundation Apache Pulsar is an ideal messaging layer for distributed components to communicate to support event-driven messaging patterns as used in Microservices architectures.

For the purposes of this article I’m going to be using TIBCO’s Apache Pulsar Community Edition which can be used to run up to 100 client instances in a production environment (you read that right — FREE to use in production environment!).

· https://www.tibco.com/resources/product-download/apache-pulsar-linux

· https://www.tibco.com/resources/product-download/apache-pulsar-macos

In order for Flogo to use Apache Pulsar to perform pub-sub messaging we’re going to need a suitable extension to Flogo product in terms of an Activity and Trigger. At this very moment of writing this article there appears to be no way to integrate Flogo and Apache Pulsar (I chose this use-case on purpose).

In this post we’re going to introduce Flogo Flynn to the world of Apache Pulsar and develop a Flogo Activity to publish messages. Part two of this post we’ll add a Flogo Trigger to support subscription pattern.

Luckily enough (for you) Flogo is a fully extensible open-source platform that lets you add new extension points without too much trouble (in Golang).

Building the Apache Pulsar Flogo Activity

We’ll start like most coder do, borrowing some template code. There’s no point re-inventing the wheel.

I’ve already downloaded Flogo’s CLI to my machine, so the first step is to clone the example code provided by Flogo core libs.

git clone https://github.com/project-flogo/core

Copy the template activity code from the core/examples/activity folder to our chosen go src folder (named pulsar)

mkdir pulsar; cd $_

cp -R $GOPATH/src/github.com/project-flogo/core/examples/activity/* .

Flogo uses metadata within descriptor.json to describe to the Flogo Web UI what the activity is called and a few other things.

  • name: The name of the activity (this should match the name of the folder the activity is in, like HelloWorld)
  • type: This describes the type of contribution this is (this should be flogo:activity in this case)
  • version: The version of the activity (it is recommended to use semantic versioning for your activities)
  • title: The application title to display in the web ui
  • description: A brief description of your activity (this is displayed in the Flogo Web UI)
  • ref: The Go package reference that will be used by the web ui to fetch the contribution upon installation
  • author: This is you!
  • settings: An array of name/type pairs that describe the activity settings. Note that activity settings are pre-compiled settings and can be used to increased performance. Settings are not fetched for every invocation
  • input: An array of name/type pairs that describe the input to the activity
  • output: An array of name/type pairs that describe the output to the activity
"name": "pulsar",
"type": "flogo:activity",
"version": "0.0.1",
"title": "Pulsar Send Activity",
"description": "Pulsar Send Activity",
"homepage": "https://github.com/mmussett/tree/master/flogo-components/activity/pulsar",
"ref": "github.com/mmussett/flogo-components/activity/pulsar",
"author": "Mark Mussett <mmussett@me.com>",

We need to provide input to the Flogo Activity and we do this by adding objects to the inputs array. We’ll add properties to allow input of the Apache Pulsar broker “url”, the “topic”, and the “payload” values:

"inputs":[
{
"name": "url",
"type": "string",
"description": "Apache Pulsar connection url",
"required": true
},
{
"name": "topic",
"type": "string",
"description": "Topic to send messages to",
"required": true
},
{
"name": "payload",
"type": "string",
"description": "Message content to send",
"required": true
}
]

We’re not expecting any output from the Activity and will therefore leave the outputs array empty.

The completed descriptor.json is looks like this:

{
"name": "pulsar",
"type": "flogo:activity",
"version": "0.0.1",
"title": "Pulsar Send Activity",
"description": "Pulsar Send Activity",
"homepage": "https://github.com/mmussett/tree/master/flogo-components/activity/pulsar",
"ref": "github.com/mmussett/flogo-components/activity/pulsar",
"author": "Mark Mussett <mmussett@me.com>",
"settings": [
],
"inputs":[
{
"name": "url",
"type": "string",
"description" : "Apache Pulsar connection url",
"required": true
},
{
"name": "topic",
"type": "string",
"description" : "Topic to send message to",
"required": true
},
{
"name": "payload",
"type": "string",
"description" : "Message content to send",
"required": true
}
],
"outputs": [
]
}

Once we have defined our settings, inputs, and outputs we need to codify how this data is made available to your business logic. This is done within the metadata.go file.

Metadata.go contains type structure definitions for input, output, and settings and whatever we set in the descriptors.go must be also defined here.

type Settings struct {
}

type Input struct {
Url string `md:"url,required"`
Topic string `md:"topic,required"`
Payload string `md:"payload,required"`
}

type Output struct {
}

We need to implement the FromMap and ToMap methods for both Input and Output struct types pointer receivers. As we’ve only defined Input struct members we only need to implement logic for Input pointer receiver:

func (r *Input) FromMap(values map[string]interface{}) error {

url, _ := coerce.ToString(values["url"])
topic, _ := coerce.ToString(values["topic"])
payload, _ := coerce.ToString(values["payload"])

r.Url = url
r.Topic = topic
r.Payload = payload

return nil
}


func (r *Input) ToMap() map[string]interface{} {

return map[string]interface{}{
"url": r.Url,
"topic": r.Topic,
"payload" : r.Payload,
}

}

We’ll set the FromMap and ToMap methods for the Output pointer receivers to return nil and empty maps respectfully:

func (o *Output) FromMap(values map[string]interface{}) error {
return nil
}


func (o *Output) ToMap() map[string]interface{} {
return map[string]interface{}{
}
}

That’s all we need to complete metadata.go implementation logic.

Let’s move on to implementing the business logic for our Flogo Activity…

The business-end of the Flogo Activity occurs in the activity.go file. Within the activity.go file we need to implement the logic within the Eval method of the Activity pointer receiver

func (a *Activity) Eval(ctx activity.Context) (done bool, err error)

The Eval method is called by the Flogo runtime and will execute our code we need to perform the sending of the message to the Apache Pulsar broker.

I’m not going to explain how to send messages using the Apache Pulsar broker, that task is left up to you to. However, the golang API provided is fairly easy to consume and we can implement the logic in a few lines of golang code:

func (a *Activity) Eval(ctx activity.Context) (done bool, err error) {

ctx.Logger().Debug("Executing pulsar activity")

input := &Input{}

err = ctx.GetInputObject(input)

if err != nil {
return false, err
}

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: input.Url,
})
if err != nil {
ctx.Logger().Error(err)
return false, err
}

defer client.Close()

producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: input.Topic,
})
if err != nil {
ctx.Logger().Error(err)
return false, err
}

defer producer.Close()

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte(input.Payload),
})
if err != nil {
ctx.Logger().Error(err)
return false, err
}

ctx.Logger().Debug("pulsar activity completed")
return true, nil
}

Flogo includes a logging API that can be easily called access the Logger object provided by the activity.Context. The environment variable FLOGO_LOG_LEVEL can be set to control the logging level in your application at runtime.

Breakdown of the code

The variable input can be assigned from the Input structure which allows us to access any structure once populated:

input := &Input{}

err = ctx.GetInputObject(input)

if err != nil {
return false, err
}

In order to send messages to Apache Pulsar we need to create a Client passing in the url value:

client, err := pulsar.NewClient(pulsar.ClientOptions{

URL: input.Url,

})

if err != nil {
ctx.Logger().Error(err)
return false, err
}

defer client.Close()

Once we have a client, then create our Producer:

producer, err := client.CreateProducer(pulsar.ProducerOptions{

Topic: input.Topic,

})


if err != nil {
ctx.Logger().Error(err)
return false, err
}

defer producer.Close()

Sending the payload is very simple with a call to the Send method of the Producer:

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{

Payload: []byte(input.Payload),

})

if err != nil {
ctx.Logger().Error(err)
return false, err
}

Testing the Activity

Start the Apache Pulsar broker:

cd /opt/tibco/apd/core/2.5/bin

./pulsar standalone

Being good gophers that we are, lets write our test func:

func TestSend(t *testing.T) {

act := &Activity{}
tc := test.NewActivityContext(act.Metadata())

input := &Input{Url: "pulsar://localhost:6650", Topic: "topic.sample", Payload: "Hello, World"}

err := tc.SetInputObject(input)
assert.Nil(t, err)

done, err := act.Eval(tc)
assert.True(t, done)
assert.Nil(t, err)

}

go test

INFO[0000] Connecting to broker remote_addr=”pulsar://localhost:6650"

INFO[0000] TCP connection established local_addr=”127.0.0.1:60964" remote_addr=”pulsar://localhost:6650"

INFO[0000] Connection is ready local_addr=”127.0.0.1:60964" remote_addr=”pulsar://localhost:6650"

INFO[0000] Created producer cnx=”127.0.0.1:60964 -> 127.0.0.1:6650" producer_name=standalone-1–3 topic=”persistent://public/default/topic.sample”

INFO[0000] Closing producer producer_name=standalone-1–3 topic=”persistent://public/default/topic.sample”

INFO[0000] Closed producer producer_name=standalone-1–3 topic=”persistent://public/default/topic.sample”

PASS

ok github.com/mmussett/flogo-components/activity/pulsar2 5.134s

Cool, it works :-)

Using the Flogo Pulsar Activity Contribution in your Flogo App

I’ve pushed the code for this simple Flogo Activity to github so it’s available for you to try with Flogo.

--

--

mmussett

I am a husband, father-of-two, dog owning, video-game-playing, motorbike-riding technologist.