forked from project-flogo/contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathactivity.go
86 lines (67 loc) · 1.9 KB
/
activity.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package kafka
import (
"fmt"
"github.com/Shopify/sarama"
"github.com/project-flogo/core/activity"
"github.com/project-flogo/core/data/metadata"
)
func init() {
_ = activity.Register(&Activity{}, New)
}
var activityMd = activity.ToMetadata(&Input{}, &Output{})
// Activity is a kafka activity
type Activity struct {
conn *KafkaConnection
topic string
}
// New create a new kafka activity
func New(ctx activity.InitContext) (activity.Activity, error) {
settings := &Settings{}
err := metadata.MapToStruct(ctx.Settings(), settings, true)
if err != nil {
return nil, err
}
conn, err := getKafkaConnection(ctx.Logger(), settings)
if err != nil {
//ctx.Logger().Errorf("Kafka parameters initialization got error: [%s]", err.Error())
return nil, err
}
act := &Activity{conn: conn, topic: settings.Topic}
return act, nil
}
// Metadata returns the metadata for the kafka activity
func (*Activity) Metadata() *activity.Metadata {
return activityMd
}
// Eval implements the evaluation of the kafka activity
func (act *Activity) Eval(ctx activity.Context) (done bool, err error) {
input := &Input{}
err = ctx.GetInputObject(input)
if err != nil {
return true, err
}
if input.Message == "" {
return false, fmt.Errorf("no message to publish")
}
ctx.Logger().Debugf("sending Kafka message")
msg := &sarama.ProducerMessage{
Topic: act.topic,
Value: sarama.StringEncoder(input.Message),
}
partition, offset, err := act.conn.Connection().SendMessage(msg)
if err != nil {
return false, fmt.Errorf("failed to send Kakfa message for reason [%s]", err.Error())
}
output := &Output{}
output.Partition = partition
output.OffSet = offset
if ctx.Logger().DebugEnabled() {
ctx.Logger().Debugf("Kafka message [%v] sent successfully on partition [%d] and offset [%d]",
input.Message, partition, offset)
}
err = ctx.SetOutputObject(output)
if err != nil {
return false, err
}
return true, nil
}