forked from influxdata/flux
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquery.go
111 lines (95 loc) · 3.93 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package flux
import (
"time"
)
// Query represents an active query.
type Query interface {
// Results returns a channel that will deliver the query results.
// Its possible that the channel is closed before any results arrive,
// in which case the query should be inspected for an error using Err().
Results() <-chan Result
// Done must always be called to free resources. It is safe to call Done
// multiple times.
Done()
// Cancel will signal that query execution should stop.
// Done must still be called to free resources.
// It is safe to call Cancel multiple times.
Cancel()
// Err reports any error the query may have encountered.
Err() error
// Statistics reports the statistics for the query.
// The statistics are not complete until Done is called.
Statistics() Statistics
}
type Metadata map[string][]interface{}
func (md Metadata) Add(key string, value interface{}) {
md[key] = append(md[key], value)
}
func (md Metadata) AddAll(other Metadata) {
for key, values := range other {
md[key] = append(md[key], values...)
}
}
// Range will iterate over the Metadata. It will invoke the function for each
// key/value pair. If there are multiple values for a single key, then this will
// be called with the same key once for each value.
func (md Metadata) Range(fn func(key string, value interface{}) bool) {
for key, values := range md {
for _, value := range values {
if ok := fn(key, value); !ok {
return
}
}
}
}
func (md Metadata) Del(key string) {
delete(md, key)
}
// Statistics is a collection of statistics about the processing of a query.
type Statistics struct {
// TotalDuration is the total amount of time in nanoseconds spent.
TotalDuration time.Duration `json:"total_duration"`
// CompileDuration is the amount of time in nanoseconds spent compiling the query.
CompileDuration time.Duration `json:"compile_duration"`
// QueueDuration is the amount of time in nanoseconds spent queueing.
QueueDuration time.Duration `json:"queue_duration"`
// PlanDuration is the amount of time in nanoseconds spent in plannig the query.
PlanDuration time.Duration `json:"plan_duration"`
// RequeueDuration is the amount of time in nanoseconds spent requeueing.
RequeueDuration time.Duration `json:"requeue_duration"`
// ExecuteDuration is the amount of time in nanoseconds spent in executing the query.
ExecuteDuration time.Duration `json:"execute_duration"`
// Concurrency is the number of goroutines allocated to process the query
Concurrency int `json:"concurrency"`
// MaxAllocated is the maximum number of bytes the query allocated.
MaxAllocated int64 `json:"max_allocated"`
// TotalAllocated is the total number of bytes allocated.
// The number includes memory that was freed and then used again.
TotalAllocated int64 `json:"total_allocated"`
// RuntimeErrors contains error messages that happened during the execution of the query.
RuntimeErrors []string `json:"runtime_errors"`
// Metadata contains metadata key/value pairs that have been attached during execution.
Metadata Metadata `json:"metadata"`
}
// Add returns the sum of s and other.
func (s Statistics) Add(other Statistics) Statistics {
errs := make([]string, len(s.RuntimeErrors), len(s.RuntimeErrors)+len(other.RuntimeErrors))
copy(errs, s.RuntimeErrors)
errs = append(errs, other.RuntimeErrors...)
md := make(Metadata)
md.AddAll(s.Metadata)
md.AddAll(other.Metadata)
return Statistics{
TotalDuration: s.TotalDuration + other.TotalDuration,
CompileDuration: s.CompileDuration + other.CompileDuration,
QueueDuration: s.QueueDuration + other.QueueDuration,
PlanDuration: s.PlanDuration + other.PlanDuration,
RequeueDuration: s.RequeueDuration + other.RequeueDuration,
ExecuteDuration: s.ExecuteDuration + other.ExecuteDuration,
Concurrency: s.Concurrency + other.Concurrency,
RuntimeErrors: errs,
MaxAllocated: s.MaxAllocated + other.MaxAllocated,
TotalAllocated: s.TotalAllocated + other.TotalAllocated,
Metadata: md,
}
}