From 31e7438a910a1800d664e5a13b679a4af5b9dfbd Mon Sep 17 00:00:00 2001 From: Frank Martinez Date: Tue, 5 Mar 2019 11:06:25 -0500 Subject: [PATCH 1/2] fix iterator and change resolver to [key/value] --- definition/definition.go | 2 +- definition/resolve.go | 120 +++++++--------------------- instance/scopes.go | 163 ++++++--------------------------------- instance/scopes_test.go | 63 +++++++++++++++ instance/taskinst.go | 57 +++++++------- instance/util.go | 4 +- model/context.go | 8 -- 7 files changed, 144 insertions(+), 273 deletions(-) create mode 100644 instance/scopes_test.go diff --git a/definition/definition.go b/definition/definition.go index f4b8af84..c13717d5 100644 --- a/definition/definition.go +++ b/definition/definition.go @@ -2,13 +2,13 @@ package definition import ( "fmt" - "github.com/project-flogo/core/support/log" "github.com/project-flogo/core/activity" "github.com/project-flogo/core/data" "github.com/project-flogo/core/data/expression" "github.com/project-flogo/core/data/mapper" "github.com/project-flogo/core/data/metadata" + "github.com/project-flogo/core/support/log" ) // Definition is the object that describes the definition of diff --git a/definition/resolve.go b/definition/resolve.go index dd66d64a..a07df6e1 100644 --- a/definition/resolve.go +++ b/definition/resolve.go @@ -2,19 +2,21 @@ package definition import ( "fmt" + "github.com/project-flogo/core/data/path" "github.com/project-flogo/core/data" "github.com/project-flogo/core/data/resolve" ) var defResolver = resolve.NewCompositeResolver(map[string]resolve.Resolver{ - ".": &resolve.ScopeResolver{}, - "env": &resolve.EnvResolver{}, - "property": &resolve.PropertyResolver{}, - "loop": &resolve.LoopResolver{}, - "activity": &ActivityResolver{}, - "error": &ErrorResolver{}, - "flow": &FlowResolver{}}) + ".": &resolve.ScopeResolver{}, + "env": &resolve.EnvResolver{}, + "property": &resolve.PropertyResolver{}, + "loop": &resolve.LoopResolver{}, + "iteration": &IteratorResolver{}, //todo should we create a separate resolver to use in iterations? + "activity": &ActivityResolver{}, + "error": &ErrorResolver{}, + "flow": &FlowResolver{}}) func GetDataResolver() resolve.CompositeResolver { return defResolver @@ -39,13 +41,13 @@ func (r *FlowResolver) Resolve(scope data.Scope, itemName, valueName string) (in return value, nil } -var actResolverInfo = resolve.NewResolverInfo(false, true) +var dynamicItemResolver = resolve.NewResolverInfo(false, true) type ActivityResolver struct { } func (r *ActivityResolver) GetResolverInfo() *resolve.ResolverInfo { - return actResolverInfo + return dynamicItemResolver } func (r *ActivityResolver) Resolve(scope data.Scope, itemName, valueName string) (interface{}, error) { @@ -75,88 +77,18 @@ func (r *ErrorResolver) Resolve(scope data.Scope, itemName, valueName string) (i return value, nil } -// -//func (r *FlowResolver) Resolve(toResolve string, scope data.Scope) (value interface{}, err error) { -// -// var details *data.ResolutionDetails -// -// if strings.HasPrefix(toResolve, "${") { -// details, err = data.GetResolutionDetailsOld(toResolve) -// } else if strings.HasPrefix(toResolve, "$") { -// details, err = data.GetResolutionDetails(toResolve[1:]) -// } else { -// return data.SimpleScopeResolve(toResolve, scope) -// } -// -// if err != nil { -// return nil, err -// } -// -// if details == nil { -// return nil, fmt.Errorf("unable to determine resolver for %s", toResolve) -// } -// -// var exists bool -// -// switch details.ResolverName { -// case "property": -// // Property resolution -// provider := data.GetPropertyProvider() -// value, exists = provider.GetProperty(details.Property + details.Path) //should we add the path and reset it to "" -// if !exists { -// err := fmt.Errorf("failed to resolve Property: '%s', ensure that property is configured in the application", details.Property) -// logger.Error(err.Error()) -// return nil, err -// } -// case "env": -// // Environment resolution -// value, exists = os.LookupEnv(details.Property + details.Path) -// if !exists { -// err := fmt.Errorf("failed to resolve Environment Variable: '%s', ensure that variable is configured", details.Property) -// logger.Error(err.Error()) -// return "", err -// } -// case "activity": -// attr, exists := scope.GetAttr("_A." + details.Item + "." + details.Property) -// if !exists { -// return nil, fmt.Errorf("failed to resolve activity attr: '%s', not found in flow", details.Property) -// } -// value = attr.TypedValue() -// case "error": -// attr, exists := scope.GetAttr("_E." + details.Property) -// if !exists { -// return nil, fmt.Errorf("failed to resolve error attr: '%s', not found in flow", details.Property) -// } -// value = attr.TypedValue() -// case "trigger": -// attr, exists := scope.GetAttr("_T." + details.Property) -// if !exists { -// return nil, fmt.Errorf("failed to resolve trigger attr: '%s', not found in flow", details.Property) -// } -// value = attr.TypedValue() -// case "flow": -// attr, exists := scope.GetAttr(details.Property) -// if !exists { -// return nil, fmt.Errorf("failed to resolve flow attr: '%s', not found in flow", details.Property) -// } -// value = attr.TypedValue() -// case "current": -// attr, exists := scope.GetAttr("$current." + details.Property) -// if !exists { -// return nil, fmt.Errorf("failed to resolve current working data: '%s', not found in scope", details.Property) -// } -// value = attr.TypedValue() -// default: -// return nil, fmt.Errorf("unsupported resolver: %s", details.ResolverName) -// } -// -// if details.Path != "" { -// value, err = data.PathGetValue(value, details.Path) -// if err != nil { -// logger.Error(err.Error()) -// return nil, err -// } -// } -// -// return value, nil -//} +type IteratorResolver struct { +} + +func (*IteratorResolver) GetResolverInfo() *resolve.ResolverInfo { + return dynamicItemResolver +} + +//Resolve resolved iterator value using the following syntax: $iteration[key], or $iteration[value] +func (*IteratorResolver) Resolve(scope data.Scope, item string, field string) (interface{}, error) { + value, exists := scope.GetValue("_W.iteration") + if !exists { + return nil, fmt.Errorf("failed to resolve iteration value, not in an iterator") + } + return path.GetValue(value, "."+item) +} diff --git a/instance/scopes.go b/instance/scopes.go index 52e29cce..f1051dbe 100644 --- a/instance/scopes.go +++ b/instance/scopes.go @@ -6,18 +6,16 @@ import ( "github.com/project-flogo/core/data" ) -type IteratorScope struct { - iteratorData map[string]interface{} - parent data.Scope +type WorkingDataScope struct { + workingData map[string]interface{} + parent data.Scope } -func (s *IteratorScope) GetValue(name string) (value interface{}, exists bool) { - if strings.HasPrefix(name, "$current.") { - val, ok := s.iteratorData[name[9:]] +func (s *WorkingDataScope) GetValue(name string) (value interface{}, exists bool) { + if strings.HasPrefix(name, "_W.") { + val, ok := s.workingData[name[3:]] if ok { return val, true - //attr, _ = data.NewAttribute(attrName[6:], data.ANY, val) - //return attr, true } return nil, false } else { @@ -25,141 +23,30 @@ func (s *IteratorScope) GetValue(name string) (value interface{}, exists bool) { } } -func (s *IteratorScope) SetValue(name string, value interface{}) error { +func (s *WorkingDataScope) SetValue(name string, value interface{}) error { return s.parent.SetValue(name, value) } -// -func NewIteratorScope(parentScope data.Scope, workingData map[string]interface{}) data.Scope { +func (s *WorkingDataScope) GetWorkingValue(name string) (value interface{}, exists bool) { + val, ok := s.workingData[name] + if ok { + return val, true + } + return nil, false +} + +func (s *WorkingDataScope) SetWorkingValue(name string, value interface{}) error { + s.workingData[name] = value + return nil +} + +// NewWorkingDataScope +func NewWorkingDataScope(parentScope data.Scope) *WorkingDataScope { - scope := &IteratorScope{ - parent: parentScope, - iteratorData: workingData, + scope := &WorkingDataScope{ + parent: parentScope, + workingData: make(map[string]interface{}), } return scope } - -// -//// WorkingDataScope is scope restricted by the set of reference attrs and backed by the specified Task -//type WorkingDataScope struct { -// parent data.Scope -// workingData map[string]*data.Attribute -//} -// -//// NewFixedTaskScope creates a FixedTaskScope -//func NewWorkingDataScope(parentScope data.Scope, workingData map[string]*data.Attribute) data.Scope { -// -// scope := &WorkingDataScope{ -// parent: parentScope, -// workingData: workingData, -// } -// -// return scope -//} -// -//// GetAttr implements Scope.GetAttr -//func (s *WorkingDataScope) GetAttr(attrName string) (attr *data.Attribute, exists bool) { -// -// if strings.HasPrefix(attrName, "$current.") { -// val, ok := s.workingData[attrName[9:]] -// if ok { -// return val, true -// //attr, _ = data.NewAttribute(attrName[6:], data.ANY, val) -// //return attr, true -// } -// return nil, false -// } else { -// return s.parent.GetAttr(attrName) -// } -//} -// -//// SetAttrValue implements Scope.SetAttrValue -//func (s *WorkingDataScope) SetAttrValue(attrName string, value interface{}) error { -// return s.parent.SetAttrValue(attrName, value) -//} -// -//// FixedTaskScope is scope restricted by the set of reference attrs and backed by the specified Task -//type FixedTaskScope struct { -// attrs map[string]*data.Attribute -// refAttrs map[string]*data.Attribute -// activityCfg *definition.ActivityConfig -// isInput bool -//} -// -//// NewFixedTaskScope creates a FixedTaskScope -//func NewFixedTaskScope(refAttrs map[string]*data.Attribute, task *definition.Task, isInput bool) data.Scope { -// -// scope := &FixedTaskScope{ -// refAttrs: refAttrs, -// isInput: isInput, -// } -// -// if task != nil { -// scope.activityCfg = task.ActivityConfig() -// } -// -// return scope -//} -// -//// GetAttr implements Scope.GetAttr -//func (s *FixedTaskScope) GetAttr(attrName string) (attr *data.Attribute, exists bool) { -// -// if len(s.attrs) > 0 { -// -// attr, found := s.attrs[attrName] -// -// if found { -// return attr, true -// } -// } -// -// if s.activityCfg != nil { -// -// var attr *data.Attribute -// var found bool -// -// if s.isInput { -// attr, found = s.activityCfg.GetInputAttr(attrName) -// } else { -// attr, found = s.activityCfg.GetOutputAttr(attrName) -// } -// -// if !found { -// attr, found = s.refAttrs[attrName] -// } -// -// return attr, found -// } -// -// return nil, false -//} -// -//// SetAttrValue implements Scope.SetAttrValue -//func (s *FixedTaskScope) SetAttrValue(attrName string, value interface{}) error { -// -// if len(s.attrs) == 0 { -// s.attrs = make(map[string]*data.Attribute) -// } -// -// logger.Debugf("SetAttr: %s = %v", attrName, value) -// -// attr, found := s.attrs[attrName] -// -// var err error -// if found { -// err = attr.SetValue(value) -// } else { -// // look up reference for type -// attr, found = s.refAttrs[attrName] -// if found { -// s.attrs[attrName], err = data.NewAttribute(attrName, attr.Type(), value) -// } else { -// logger.Debugf("SetAttr: Attr '%s' not found in metadata\n", attrName) -// logger.Debugf("SetAttr: metadata %v\n", s.refAttrs) -// } -// //todo: else error -// } -// -// return err -//} diff --git a/instance/scopes_test.go b/instance/scopes_test.go new file mode 100644 index 00000000..54556156 --- /dev/null +++ b/instance/scopes_test.go @@ -0,0 +1,63 @@ +package instance + +import ( + "testing" + + "github.com/project-flogo/core/data" + "github.com/project-flogo/core/data/resolve" + "github.com/project-flogo/flow/definition" + "github.com/stretchr/testify/assert" +) + +func TestWorkingDataScope_GetValue(t *testing.T) { + + vals := map[string]interface{}{"foo": 1, "bar": 2} + baseScope := data.NewSimpleScope(vals, nil) + + workingDataScope := NewWorkingDataScope(baseScope) + + iteration := map[string]interface{}{ + "key": 1, + "value": "blah", + } + + workingDataScope.SetWorkingValue("iteration", iteration) + + v, exists := workingDataScope.GetWorkingValue("iteration") + assert.True(t, exists) + assert.Equal(t, iteration, v) + + v, exists = workingDataScope.GetValue("_W.iteration") + assert.True(t, exists) + assert.Equal(t, iteration, v) + + v, exists = workingDataScope.GetValue("foo") + assert.True(t, exists) + assert.Equal(t, 1, v) +} + +func TestIterationResolver(t *testing.T) { + + vals := map[string]interface{}{"foo": 1, "bar": 2} + baseScope := data.NewSimpleScope(vals, nil) + + workingDataScope := NewWorkingDataScope(baseScope) + + iteration := map[string]interface{}{ + "key": 1, + "value": "blah", + } + + workingDataScope.SetWorkingValue("iteration", iteration) + + resolver := resolve.NewCompositeResolver(map[string]resolve.Resolver{ + "iteration": &definition.IteratorResolver{}}) + + v, err := resolver.Resolve("$iteration[key]", workingDataScope) + assert.Nil(t, err) + assert.Equal(t, 1, v) + + v, err = resolver.Resolve("$iteration[value]", workingDataScope) + assert.Nil(t, err) + assert.Equal(t, "blah", v) +} diff --git a/instance/taskinst.go b/instance/taskinst.go index 5980a04b..b7ddbb59 100644 --- a/instance/taskinst.go +++ b/instance/taskinst.go @@ -34,7 +34,7 @@ type TaskInst struct { task *definition.Task status model.TaskStatus - workingData map[string]interface{} + workingData *WorkingDataScope inputs map[string]interface{} outputs map[string]interface{} @@ -120,24 +120,22 @@ func (ti *TaskInst) SetStatus(status model.TaskStatus) { ti.flowInst.master.ChangeTracker.trackTaskData(ti.flowInst.subFlowId, &TaskInstChange{ChgType: CtUpd, ID: ti.task.ID(), TaskInst: ti}) } -func (ti *TaskInst) HasWorkingData() bool { - return ti.workingData != nil -} - -func (ti *TaskInst) Resolve(toResolve string) (value interface{}, err error) { - //Support expression mapping +//func (ti *TaskInst) HasWorkingData() bool { +// return ti.workingData != nil +//} - //return exprmapper.GetMappingValue(toResolve, ti.flowInst, definition.GetDataResolver()) - return nil, nil -} +//func (ti *TaskInst) Resolve(toResolve string) (value interface{}, err error) { +// //Support expression mapping +// +// //return exprmapper.GetMappingValue(toResolve, ti.flowInst, definition.GetDataResolver()) +// return nil, nil +//} func (ti *TaskInst) SetWorkingData(key string, value interface{}) error { if ti.workingData == nil { - ti.workingData = make(map[string]interface{}) + ti.workingData = NewWorkingDataScope(ti.flowInst) } - ti.workingData[key] = value - - return nil + return ti.workingData.SetWorkingValue(key, value) } func (ti *TaskInst) GetWorkingData(key string) (interface{}, bool) { @@ -145,8 +143,7 @@ func (ti *TaskInst) GetWorkingData(key string) (interface{}, bool) { return nil, false } - v, ok := ti.workingData[key] - return v, ok + return ti.workingData.GetWorkingValue(key) } // Task implements model.TaskContext.Task, by returning the Task associated with this @@ -379,20 +376,20 @@ func (ti *TaskInst) GetSetting(name string) (value interface{}, exists bool) { return value, exists } -// FlowReply is used to reply to the Flow Host with the results of the execution -func (ti *TaskInst) FlowReply(replyData map[string]interface{}, err error) { - //ignore -} - -// FlowReturn is used to indicate to the Flow Host that it should complete and return the results of the execution -func (ti *TaskInst) FlowReturn(returnData map[string]interface{}, err error) { - - if err != nil { - for name, value := range returnData { - ti.SetWorkingData(name, value) - } - } -} +//// FlowReply is used to reply to the Flow Host with the results of the execution +//func (ti *TaskInst) FlowReply(replyData map[string]interface{}, err error) { +// //ignore +//} +// +//// FlowReturn is used to indicate to the Flow Host that it should complete and return the results of the execution +//func (ti *TaskInst) FlowReturn(returnData map[string]interface{}, err error) { +// +// if err != nil { +// for name, value := range returnData { +// ti.SetWorkingData(name, value) +// } +// } +//} func (ti *TaskInst) appendErrorData(err error) { diff --git a/instance/util.go b/instance/util.go index 5aca0da0..335ea168 100644 --- a/instance/util.go +++ b/instance/util.go @@ -52,8 +52,8 @@ func applyInputMapper(taskInst *TaskInst) error { var inputScope data.Scope inputScope = taskInst.flowInst - if taskInst.workingData != nil { //and an iterator - inputScope = NewIteratorScope(taskInst.flowInst, taskInst.workingData) + if taskInst.workingData != nil { + inputScope = taskInst.workingData } var err error diff --git a/model/context.go b/model/context.go index fda0a4f6..b901a4f4 100644 --- a/model/context.go +++ b/model/context.go @@ -56,14 +56,6 @@ type TaskContext interface { GetWorkingData(key string) (interface{}, bool) - //Resolve(toResolve string) (value interface{}, err error) - - //AddWorkingData(attr *data.Attribute) - - //UpdateWorkingData(key string, value interface{}) error - - //GetWorkingData(key string) (*data.Attribute, bool) - FlowLogger() log.Logger } From 91ea4a3929422e899e4ab1f527d20ae33c26cbe4 Mon Sep 17 00:00:00 2001 From: Frank Martinez Date: Tue, 5 Mar 2019 12:47:19 -0500 Subject: [PATCH 2/2] fix task events --- instance/taskevents.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/instance/taskevents.go b/instance/taskevents.go index 62671591..35b25b8d 100644 --- a/instance/taskevents.go +++ b/instance/taskevents.go @@ -100,12 +100,12 @@ func postTaskEvent(taskInstance *TaskInst) { te.taskOut = make(map[string]interface{}) // Add working data - wData := taskInstance.workingData - if wData != nil && len(wData) > 0 { - for name, attVal := range wData { - te.taskIn[name] = attVal - } - } + //wData := taskInstance.workingData + //if wData != nil && len(wData) > 0 { + // for name, attVal := range wData { + // te.taskIn[name] = attVal + // } + //} // Add activity input/output // TODO optimize this computation for given instance @@ -120,7 +120,7 @@ func postTaskEvent(taskInstance *TaskInst) { for name, attVal := range actConfig.Activity.Metadata().Input { te.taskIn[name] = attVal.Value() if taskInstance.inputs != nil { - scopedValue, ok := taskInstance.inputs(name) + scopedValue, ok := taskInstance.inputs[name] if ok { te.taskIn[name] = scopedValue } @@ -133,7 +133,7 @@ func postTaskEvent(taskInstance *TaskInst) { for name, attVal := range actConfig.Activity.Metadata().Output { te.taskOut[name] = attVal.Value() if taskInstance.outputs != nil { - scopedValue, ok := taskInstance.outputs(name) + scopedValue, ok := taskInstance.outputs[name] if ok { te.taskOut[name] = scopedValue } @@ -147,7 +147,7 @@ func postTaskEvent(taskInstance *TaskInst) { for name, attVal := range actConfig.Activity.Metadata().Input { te.taskIn[name] = attVal.Value() if taskInstance.inputs != nil { - scopedValue, ok := taskInstance.inputs(name) + scopedValue, ok := taskInstance.inputs[name] if ok { te.taskIn[name] = scopedValue } @@ -159,7 +159,7 @@ func postTaskEvent(taskInstance *TaskInst) { for name, attVal := range actConfig.Activity.Metadata().Output { te.taskOut[name] = attVal.Value() if taskInstance.outputs != nil { - scopedValue, ok := taskInstance.outputs(name) + scopedValue, ok := taskInstance.outputs[name] if ok { te.taskOut[name] = scopedValue }