Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt shared Connection Support #14

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@

#VS code
.vscode/

mqtt-shared
20 changes: 8 additions & 12 deletions activity/mqtt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,12 @@ flogo install github.com/project-flogo/edge-contrib/activity/mqtt
## Configuration

### Settings:
| Name | Type | Description
| :--- | :--- | :---
| broker | string | The broker URL - ***REQUIRED***
| id | string | The id of client - ***REQUIRED***
| username | string | The name of the user
| password | string | The password of the user
| store | string | The store for message persistence
| cleanSession | bool | Clean session flag
| topic | string | The topic to publish to - ***REQUIRED***
| retain | bool | Retain Messages
| qos | int | The quality of service
| sslConfig | object | SSL configuration
| Name | Type | Description
| :--- | :--- | :---
| topic | string | The topic to publish to - ***REQUIRED***
| retain | bool | Retain Messages
| qos | int | The quality of service
| sharedconnection | string | Reference to Mqtt Connection

#### *sslConfig* Object:
| Property | Type | Description
Expand Down Expand Up @@ -76,3 +70,5 @@ A substitution syntax is supported. For example if the topic is '/x/:/y/:' then
}
}
```

For More Information on Mqtt SharedConnection please [visit](../connection/mqtt)
33 changes: 27 additions & 6 deletions activity/mqtt/activity.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package mqtt

import (
"github.com/project-flogo/core/support/connection"
"strconv"
"strings"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/project-flogo/core/activity"
"github.com/project-flogo/core/data/coerce"
"github.com/project-flogo/core/data/metadata"
"github.com/project-flogo/core/support/log"
"github.com/project-flogo/core/support/ssl"
)

var activityMd = activity.ToMetadata(&Settings{}, &Input{}, &Output{})
Expand Down Expand Up @@ -89,8 +89,22 @@ func New(ctx activity.InitContext) (activity.Activity, error) {
if err != nil {
return nil, err
}
if settings.SharedConnection != "" {

options := initClientOption(ctx.Logger(), settings)
client, err := coerce.ToConnection(settings.SharedConnection)
if err != nil {
return nil, err
}

act := &Activity{
settings: settings,
connManager: client,
}
return act, nil
}

return nil, nil
/*options := initClientOption(ctx.Logger(), settings)

if strings.HasPrefix(settings.Broker, "ssl") {

Expand Down Expand Up @@ -134,12 +148,15 @@ func New(ctx activity.InitContext) (activity.Activity, error) {
topic: ParseTopic(settings.Topic),
}
return act, nil
*/

}

type Activity struct {
settings *Settings
client mqtt.Client
topic Topic
connManager connection.Manager
}

func (a *Activity) Metadata() *activity.Metadata {
Expand All @@ -155,13 +172,15 @@ func (a *Activity) Eval(ctx activity.Context) (done bool, err error) {
if err != nil {
return true, err
}

topic := a.settings.Topic
if params := input.TopicParams; len(params) > 0 {
topic = a.topic.String(params)
}
if token := a.client.Publish(topic, byte(a.settings.Qos), a.settings.Retain, input.Message); token.Wait() && token.Error() != nil {
ctx.Logger().Debugf("Error in publishing: %v", err)

ctx.Logger().Info("Publishing Message.", input.Message)

if token := a.connManager.GetConnection().(mqtt.Client).Publish(topic, byte(a.settings.Qos), a.settings.Retain, input.Message); token.Wait() && token.Error() != nil {
ctx.Logger().Info("Error in publishing..")
return true, token.Error()
}

Expand All @@ -170,6 +189,7 @@ func (a *Activity) Eval(ctx activity.Context) (done bool, err error) {
return true, nil
}

/*
func initClientOption(logger log.Logger, settings *Settings) *mqtt.ClientOptions {

opts := mqtt.NewClientOptions()
Expand All @@ -186,3 +206,4 @@ func initClientOption(logger log.Logger, settings *Settings) *mqtt.ClientOptions

return opts
}
*/
2 changes: 1 addition & 1 deletion activity/mqtt/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.12
require (
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/pkg/errors v0.8.1 // indirect
github.com/project-flogo/core v0.9.0
github.com/project-flogo/core v0.9.3-0.20190726142805-ef75331bd75a
github.com/stretchr/testify v1.3.0
golang.org/x/net v0.0.0-20190514140710-3ec191127204 // indirect
)
6 changes: 4 additions & 2 deletions activity/mqtt/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/project-flogo/core v0.9.0 h1:/iR4m5L0zj5SuqLtDDZIRyvrvG8TxwxdM0n8ZURo1I4=
github.com/project-flogo/core v0.9.0/go.mod h1:QGWi7TDLlhGUaYH3n/16ImCuulbEHGADYEXyrcHhX7U=
github.com/project-flogo/core v0.9.3-0.20190726142805-ef75331bd75a h1:6S6rhPgntonQfPxNaW2E7AiESFt0beU34f7ZfV2p9mk=
github.com/project-flogo/core v0.9.3-0.20190726142805-ef75331bd75a/go.mod h1:QGWi7TDLlhGUaYH3n/16ImCuulbEHGADYEXyrcHhX7U=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand Down
13 changes: 4 additions & 9 deletions activity/mqtt/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,11 @@ import (
)
skothari-tibco marked this conversation as resolved.
Show resolved Hide resolved

type Settings struct {
Broker string `md:"broker,required"` // The broker URL
Id string `md:"id,required"` // The id of client
Username string `md:"username"` // The user's name
Password string `md:"password"` // The user's password
Store string `md:"store"` // The store for message persistence
CleanSession bool `md:"cleanSession"` // Clean session flag

Retain bool `md:"retain"` // Retain Messages
Topic string `md:"topic,required"` // The topic to publish to
Topic string `md:"topic"` // The topic to publish to
Qos int `md:"qos"` // The Quality of Service
SSLConfig map[string]interface{} `md:"sslConfig"` // SSL Configuration
SharedConnection string `md:"sharedconnection"`

}

type Input struct {
Expand All @@ -41,6 +35,7 @@ func (i *Input) FromMap(values map[string]interface{}) error {
if err != nil {
return err
}

return nil
}

Expand Down
107 changes: 107 additions & 0 deletions connection/mqtt/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package mqtt

import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/project-flogo/core/data/coerce"
"github.com/project-flogo/core/data/metadata"
"github.com/project-flogo/core/support/connection"
)

func init() {

_ = connection.RegisterManagerFactory(&Factory{})
}

type Settings struct {
Broker string `md:"broker"` //The Broker to connect to
Id string `md:"id"` // Id of the client
User string `md:"user"` // User name of the client
Password string `md:"password"` //Password of the client
Store string `md:"store"` //Cert Store
Cleansess bool `md:"cleansess"` //Cleansess flag
Close uint `md:"close"` //Time in millisecond to disconnect
}

type MqttSharedConn struct {
settings *Settings
conn mqtt.Client
}

type Factory struct {
}

func (f *Factory) Type() string {

return "mqtt:paho.mqtt.golang"
}

func (*Factory) NewManager(settings map[string]interface{}) (connection.Manager, error) {
settingStruct := &Settings{}
err := metadata.MapToStruct(settings, settingStruct, true)
if err != nil {
return nil, err
}
conn, err := getMqttConnection(settingStruct)
if err != nil {
//fmt.Printf("Mqtt Client initialization got error: [%s]", err.Error())
return nil, err
}
if token := conn.Connect(); token.Wait() && token.Error() != nil {
return nil, token.Error()
}

sharedConn := &MqttSharedConn{settings: settingStruct, conn: conn}

return sharedConn, nil
}

func (h *MqttSharedConn) Type() string {

return "mqtt:paho.mqtt.golang"
}

func (h *MqttSharedConn) GetConnection() interface{} {

return h.conn
}

func (h *MqttSharedConn) ReleaseConnection(connection interface{}) {

h.conn.Disconnect(h.settings.Close)

}

func (h *MqttSharedConn) Start() error {

return nil
}

func getMqttConnection(settings *Settings) (mqtt.Client, error) {
options := initClientOption(settings)

mqttClient := mqtt.NewClient(options)

return mqttClient, nil
}

func initClientOption(settings *Settings) *mqtt.ClientOptions {

opts := mqtt.NewClientOptions()
opts.AddBroker(settings.Broker)
opts.SetClientID(settings.Id)
opts.SetUsername(settings.User)
opts.SetPassword(settings.Password)
b, err := coerce.ToBool(settings.Cleansess)
if err != nil {
//log.Error("Error converting \"cleansess\" to a boolean ", err.Error())
return nil
}
opts.SetCleanSession(b)
if storeType := settings.Store; storeType != ":memory:" {
if settings.Store != "" {
opts.SetStore(mqtt.NewFileStore(settings.Store))
}

}
return opts
}
11 changes: 11 additions & 0 deletions connection/mqtt/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/project-flogo/edge-contrib/connection/mqtt

go 1.12

require (
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/project-flogo/core v0.9.3-0.20190726142805-ef75331bd75a
github.com/stretchr/objx v0.2.0 // indirect
go.uber.org/zap v1.10.0 // indirect
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b // indirect
)
24 changes: 24 additions & 0 deletions connection/mqtt/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/project-flogo/core v0.9.3-0.20190726142805-ef75331bd75a h1:6S6rhPgntonQfPxNaW2E7AiESFt0beU34f7ZfV2p9mk=
github.com/project-flogo/core v0.9.3-0.20190726142805-ef75331bd75a/go.mod h1:QGWi7TDLlhGUaYH3n/16ImCuulbEHGADYEXyrcHhX7U=
github.com/project-flogo/core v0.9.3 h1:uZXHR9j1Byqt+x3faNnOqB8NlEfwE2gpCh40iQ+44oA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/xeipuuv/gojsonschema v1.1.0/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b h1:XfVGCX+0T4WOStkaOsJRllbsiImhB2jgVBGc9L0lPGc=
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
70 changes: 70 additions & 0 deletions example/mqtt-shared/flogo.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
{

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mellistibco should we have a top-level "examples" directory in this repo? or should we have an examples directory under the mqtt activity and mqtt trigger?

Maybe we should create a top-level mqtt directory like we are doing for mongodb and have an examples directory within it. and leave the old mqtt as is.

"name": "mqtt-shared",
"type": "flogo:app",
"version": "0.0.1",
"description": "",
"appModel": "1.0.0",
"imports": [
"github.com/project-flogo/contrib/activity/log",
"github.com/project-flogo/flow",
"github.com/project-flogo/edge-contrib/connection/mqtt",
"github.com/project-flogo/contrib/trigger/timer",
"github.com/project-flogo/edge-contrib/activity/mqtt"
],
"triggers": [
{
"id": "flogo-time",
"ref": "#timer",
"settings": null,
"handlers": [
{
"settings": null,
"actions": [
{
"ref": "#flow",
"settings": {
"flowURI": "res://flow:test"
}
}
]
}
]
}
],
"connections":{
"mymqttconn":{
"ref":"github.com/project-flogo/edge-contrib/connection/mqtt",
"settings": {
"broker" : "tcp://localhost:1883",
"id": "sender_1"
}
}
},
"resources": [
{
"id": "flow:test",
"data": {
"name": "test",
"description": "A sample flow",
"tasks": [
{
"id": "Mqtt-Activity",
"name": "Send Mqtt Message using Shared Activity",
"activity": {
"ref": "github.com/project-flogo/edge-contrib/activity/mqtt",
"settings": {
"sharedconnection": "conn://mymqttconn",
"topic":"led",
"qos": "2",
"retain":true
},
"input": {
"message": "SAmple"
}
}
}
]
}
}
]
}
Loading