diff --git a/ChangeLog.md b/ChangeLog.md index e2e78be..17bcb6b 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -6,6 +6,10 @@ to [Semantic Versioning](http://semver.org/) rules. ## [Next Release] +## [v1.13.5] - 2023-03-14 + +* add: Adds support for PromQL instant queries. + ## [v1.13.4] - 2023-03-10 * fix: Restores some quote handling for metric name parsing but corrects the @@ -475,6 +479,7 @@ writing to histogram endpoints. any delay, once started. Created: 2019-03-12. Fixed: 2019-03-13. [Next Release]: https://github.com/circonus-labs/gosnowth +[v1.13.5]: https://github.com/circonus-labs/gosnowth/releases/tag/v1.13.5 [v1.13.4]: https://github.com/circonus-labs/gosnowth/releases/tag/v1.13.4 [v1.13.3]: https://github.com/circonus-labs/gosnowth/releases/tag/v1.13.3 [v1.13.2]: https://github.com/circonus-labs/gosnowth/releases/tag/v1.13.2 diff --git a/promql.go b/promql.go index 9909b7b..032a2b5 100644 --- a/promql.go +++ b/promql.go @@ -11,6 +11,17 @@ import ( "time" ) +// PromQLInstantQuery values represent Prometheus queries for data from a +// single point in time. +// These values are accepted as strings and will accept the same values as +// would be passed to the prometheus /api/v1/query endpoint. +type PromQLInstantQuery struct { + Query string `json:"query,omitempty"` + Time string `json:"time,omitempty"` + Timeout string `json:"timeout,omitempty"` + AccountID string `json:"account_id,omitempty"` +} + // PromQLRangeQuery values represent Prometheus range queries. // These values are accepted as strings and will accept the same values as // would be passed to the prometheus /api/v1/query_range endpoint. @@ -64,6 +75,150 @@ func (pe *PromQLError) Error() string { return pe.String() } +// PromQLInstantQuery evaluates a PromQL query at a single point in time. +func (sc *SnowthClient) PromQLInstantQuery(query *PromQLInstantQuery, + nodes ...*SnowthNode, +) (*PromQLResponse, error) { + return sc.PromQLInstantQueryContext(context.Background(), query, nodes...) +} + +// PromQLInstantQueryContext is the context aware version of PromQLInstantQuery. +func (sc *SnowthClient) PromQLInstantQueryContext(ctx context.Context, + query *PromQLInstantQuery, + nodes ...*SnowthNode, +) (*PromQLResponse, error) { + var node *SnowthNode + + if len(nodes) > 0 && nodes[0] != nil { + node = nodes[0] + } else { + node = sc.GetActiveNode() + } + + if node == nil { + return nil, fmt.Errorf("unable to get active node") + } + + if query == nil { + return nil, fmt.Errorf("invalid PromQL query: null") + } + + u := "/extension/lua/public/caql_v1" + + q := &CAQLQuery{ + Query: `#lang="promql" ` + query.Query, + Format: "PQ", + Period: 1, + } + + if query.AccountID != "" { + i, err := strconv.ParseInt(query.AccountID, 10, 64) + if err != nil { + return nil, + fmt.Errorf("invalid PromQL query: invalid account_id: %v", + query.AccountID) + } + + q.AccountID = i + } + + if query.Time != "" { + if t, err := time.Parse(time.RFC3339, query.Time); err != nil { + i, err := strconv.ParseInt(query.Time, 10, 64) + if err != nil { + return nil, + fmt.Errorf("invalid PromQL query: invalid time: %v", + query.Time) + } + + q.End = i + q.Start = i - 1 + } else { + q.End = t.Unix() + q.Start = t.Unix() - 1 + } + } + + if query.Timeout != "" { + if d, err := time.ParseDuration(query.Timeout); err != nil { + f, err := strconv.ParseFloat(query.Timeout, 64) + if err != nil { + return nil, + fmt.Errorf("invalid PromQL range query: invalid timeout: %v", + query.Timeout) + } + + q.Timeout = int64(f) + } else { + q.Period = int64(d.Seconds()) + } + } + + qBuf, err := encodeJSON(q) + if err != nil { + return nil, err + } + + bBuf, err := io.ReadAll(qBuf) + if err != nil { + return nil, fmt.Errorf("unable to read request body buffer: %w", err) + } + + // CAQL extension does not like the JSON in the request body to end with \n. + if strings.HasSuffix(string(bBuf), "\n") { + bBuf = bBuf[:len(bBuf)-1] + } + + var r *PromQLResponse + + body, _, rErr := sc.DoRequestContext(ctx, node, "POST", u, + bytes.NewBuffer(bBuf), nil) + + if body == nil { + return nil, rErr + } + + buf, err := io.ReadAll(body) + if err != nil { + return nil, fmt.Errorf("unable to read response body buffer: %w", + err) + } + + if rErr != nil { + r = &PromQLResponse{ + Status: "error", + ErrorType: "database", + Error: string(buf), + } + + var cErr *CAQLError + + err := decodeJSON(bytes.NewBuffer(buf), &cErr) + if err != nil { + return nil, fmt.Errorf("unable to decode error response: %w", + err) + } + + if cErr != nil && cErr.Message() != "" { + r.ErrorType = "caql" + r.Error = cErr.Message() + } + + rErr = &PromQLError{ + Status: r.Status, + ErrorType: r.ErrorType, + Err: r.Error, + } + } else { + if err := decodeJSON(bytes.NewBuffer(buf), &r); err != nil { + return nil, fmt.Errorf("unable to decode PromQL response: %w", + err) + } + } + + return r, rErr +} + // PromQLRangeQuery evaluates a PromQL query over a range of time. func (sc *SnowthClient) PromQLRangeQuery(query *PromQLRangeQuery, nodes ...*SnowthNode, @@ -89,7 +244,7 @@ func (sc *SnowthClient) PromQLRangeQueryContext(ctx context.Context, } if query == nil { - return nil, fmt.Errorf("invalid PromQL query: null") + return nil, fmt.Errorf("invalid PromQL range query: null") } u := "/extension/lua/public/caql_v1" @@ -99,8 +254,6 @@ func (sc *SnowthClient) PromQLRangeQueryContext(ctx context.Context, Format: "PQR", } - q.Format = "PQR" - if query.AccountID != "" { i, err := strconv.ParseInt(query.AccountID, 10, 64) if err != nil { diff --git a/promql_test.go b/promql_test.go index 20ecb18..e0ddc9c 100644 --- a/promql_test.go +++ b/promql_test.go @@ -16,7 +16,28 @@ const testPromQLError = `{ "error": "test" }` -const testPromQLResponse = `{ +const testPromQLInstantQueryResponse = `{ + "status": "success", + "data": { + "result": [ + { + "value": [ + [ + 1676388600, + "3568" + ] + ], + "metric": { + "__name__": "bytes", + "__check_uuid": "09fc1c4e-8540-49a8-a109-8895553718fc" + } + } + ], + "resulttype": "vector" + } +}` + +const testPromQLRangeQueryResponse = `{ "status": "success", "data": { "result": [ @@ -49,6 +70,105 @@ const testPromQLResponse = `{ } }` +func TestPromQLInstantQuery(t *testing.T) { + t.Parallel() + + ms := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, + r *http.Request, + ) { + if r.RequestURI == "/state" { + _, _ = w.Write([]byte(stateTestData)) + + return + } + + if r.RequestURI == "/stats.json" { + _, _ = w.Write([]byte(statsTestData)) + + return + } + + if r.Method == "POST" && strings.HasPrefix(r.RequestURI, + "/extension/lua/public/caql_v1") { + b, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(testPromQLError)) + + return + } + + if len(b) == 0 { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(testPromQLError)) + + return + } + + if strings.Contains(string(b), "127") { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(testCAQLError)) + + return + } + + _, _ = w.Write([]byte(testPromQLInstantQueryResponse)) + + return + } + })) + + defer ms.Close() + + sc, err := NewClient(context.Background(), + &Config{Servers: []string{ms.URL}}) + if err != nil { + t.Fatal("Unable to create snowth client", err) + } + + sc.SetRetries(1) + sc.SetConnectRetries(1) + + u, err := url.Parse(ms.URL) + if err != nil { + t.Fatal("Invalid test URL") + } + + node := &SnowthNode{url: u} + + res, err := sc.PromQLInstantQuery(&PromQLInstantQuery{ + AccountID: "1", + Query: "test", + Time: "300", + }, node) + if err != nil { + t.Fatal(err) + } + + if res.Data == nil { + t.Fatalf("Expected data: 2, got: %v", res.Data) + } + + res, err = sc.PromQLInstantQuery(&PromQLInstantQuery{ + AccountID: "1", + Query: "test", + Time: "127", + }, node) + if err == nil { + t.Fatal("Expected PromQL error response") + } + + if res.ErrorType != "caql" { + t.Errorf("Expected error type: caql, got: %v", res.ErrorType) + } + + exp := "Function not found: histograms" + + if res.Error != exp { + t.Errorf("Expected error: %v, got: %v", exp, res.Error) + } +} + func TestPromQLRangeQuery(t *testing.T) { t.Parallel() @@ -91,7 +211,7 @@ func TestPromQLRangeQuery(t *testing.T) { return } - _, _ = w.Write([]byte(testPromQLResponse)) + _, _ = w.Write([]byte(testPromQLRangeQueryResponse)) return }