-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmultitask.go
121 lines (106 loc) · 2.71 KB
/
multitask.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package multitask
import (
"errors"
"fmt"
"time"
)
func handleMultiTask(task_id int, fn func() (interface{}, error), timeout time.Duration, ch chan interface{}) {
startTime := time.Now()
ch_run := make(chan interface{})
go handleTask(task_id, ch_run, fn)
select {
case re := <-ch_run:
endTime := time.Now()
re.(*Value).ExcutionTime = endTime.Sub(startTime)
ch <- re
case <-time.After(timeout):
// new listener
go func() {
<-ch_run
}()
endTime := time.Now()
resulter := &Value{
ID: task_id,
Data: nil,
Error: ErrExcuteTimeout,
ExcutionTime: endTime.Sub(startTime),
}
ch <- resulter
}
}
func handleTask(task_id int, ch chan interface{}, fn func() (interface{}, error)) {
defer func() {
if err := recover(); err != nil {
resulter := &Value{
ID: task_id,
Data: nil,
Error: mergeErrors(ErrExcutePanic, errors.New(fmt.Sprintf("%s", err))),
}
ch <- resulter
}
}()
data, err := fn()
resulter := &Value{
ID: task_id,
Data: data,
Error: err,
}
ch <- resulter
return
}
type taskManger struct {
ExpiryDuration time.Duration
Option taskOption
}
type MultitaskFuncChains []MultitaskFunc
// multi task func
type MultitaskFunc func() (interface{}, error) // ([]*multitask.Resulter, error)
// new task public instance
func NewTask(expiryDuration time.Duration, opts ...TaskOption) *taskManger {
jobManager := &taskManger{
ExpiryDuration: expiryDuration,
Option: defaultOptions(),
}
for _, opt := range opts {
opt.apply(&jobManager.Option)
}
return jobManager
}
// excute all kind of function type
// map, collection and single funciton
func (t *taskManger) Excute(fnCollection []MultitaskFunc) ([]*Value, error) {
return t.excute(fnCollection)
}
func (t *taskManger) excute(fnCollection []MultitaskFunc) ([]*Value, error) {
// Options
if t.ExpiryDuration <= 0 {
return nil, ErrInvalidExpiryDuration
}
if t.Option.QuantityPreExecution <= 0 {
t.Option.QuantityPreExecution = len(fnCollection)
}
timeout := t.ExpiryDuration
maxTaskCount := t.Option.QuantityPreExecution
chLimit := make(chan bool, maxTaskCount)
chs := make([]chan interface{}, len(fnCollection))
limitFunc := func(chLimit chan bool, ch chan interface{}, task_id int, fn func() (interface{}, error), timeout time.Duration) {
defer func() {
if err := recover(); err != nil {
<-chLimit
}
}()
handleMultiTask(task_id, fn, timeout, ch)
<-chLimit
}
for i, fn := range fnCollection {
chs[i] = make(chan interface{}, 1)
chLimit <- true
go limitFunc(chLimit, chs[i], i, fn, timeout)
}
values := make([]*Value, len(fnCollection))
for i, ch := range chs {
resulter := <-ch
values[i] = resulter.(*Value)
}
return values, nil
}