Skip to content

Commit

Permalink
CIRC-9934: Add support for PromQL instant queries (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaifley authored Mar 14, 2023
1 parent 8ecd595 commit 54111db
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 5 deletions.
5 changes: 5 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ to [Semantic Versioning](http://semver.org/) rules.

## [Next Release]

## [v1.13.5] - 2023-03-14

* add: Adds support for PromQL instant queries.

## [v1.13.4] - 2023-03-10

* fix: Restores some quote handling for metric name parsing but corrects the
Expand Down Expand Up @@ -475,6 +479,7 @@ writing to histogram endpoints.
any delay, once started. Created: 2019-03-12. Fixed: 2019-03-13.

[Next Release]: https://github.com/circonus-labs/gosnowth
[v1.13.5]: https://github.com/circonus-labs/gosnowth/releases/tag/v1.13.5
[v1.13.4]: https://github.com/circonus-labs/gosnowth/releases/tag/v1.13.4
[v1.13.3]: https://github.com/circonus-labs/gosnowth/releases/tag/v1.13.3
[v1.13.2]: https://github.com/circonus-labs/gosnowth/releases/tag/v1.13.2
Expand Down
159 changes: 156 additions & 3 deletions promql.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ import (
"time"
)

// PromQLInstantQuery values represent Prometheus queries for data from a
// single point in time.
// These values are accepted as strings and will accept the same values as
// would be passed to the prometheus /api/v1/query endpoint.
type PromQLInstantQuery struct {
Query string `json:"query,omitempty"`
Time string `json:"time,omitempty"`
Timeout string `json:"timeout,omitempty"`
AccountID string `json:"account_id,omitempty"`
}

// PromQLRangeQuery values represent Prometheus range queries.
// These values are accepted as strings and will accept the same values as
// would be passed to the prometheus /api/v1/query_range endpoint.
Expand Down Expand Up @@ -64,6 +75,150 @@ func (pe *PromQLError) Error() string {
return pe.String()
}

// PromQLInstantQuery evaluates a PromQL query at a single point in time.
func (sc *SnowthClient) PromQLInstantQuery(query *PromQLInstantQuery,
nodes ...*SnowthNode,
) (*PromQLResponse, error) {
return sc.PromQLInstantQueryContext(context.Background(), query, nodes...)
}

// PromQLInstantQueryContext is the context aware version of PromQLInstantQuery.
func (sc *SnowthClient) PromQLInstantQueryContext(ctx context.Context,
query *PromQLInstantQuery,
nodes ...*SnowthNode,
) (*PromQLResponse, error) {
var node *SnowthNode

if len(nodes) > 0 && nodes[0] != nil {
node = nodes[0]
} else {
node = sc.GetActiveNode()
}

if node == nil {
return nil, fmt.Errorf("unable to get active node")
}

if query == nil {
return nil, fmt.Errorf("invalid PromQL query: null")
}

u := "/extension/lua/public/caql_v1"

q := &CAQLQuery{
Query: `#lang="promql" ` + query.Query,
Format: "PQ",
Period: 1,
}

if query.AccountID != "" {
i, err := strconv.ParseInt(query.AccountID, 10, 64)
if err != nil {
return nil,
fmt.Errorf("invalid PromQL query: invalid account_id: %v",
query.AccountID)
}

q.AccountID = i
}

if query.Time != "" {
if t, err := time.Parse(time.RFC3339, query.Time); err != nil {
i, err := strconv.ParseInt(query.Time, 10, 64)
if err != nil {
return nil,
fmt.Errorf("invalid PromQL query: invalid time: %v",
query.Time)
}

q.End = i
q.Start = i - 1
} else {
q.End = t.Unix()
q.Start = t.Unix() - 1
}
}

if query.Timeout != "" {
if d, err := time.ParseDuration(query.Timeout); err != nil {
f, err := strconv.ParseFloat(query.Timeout, 64)
if err != nil {
return nil,
fmt.Errorf("invalid PromQL range query: invalid timeout: %v",
query.Timeout)
}

q.Timeout = int64(f)
} else {
q.Period = int64(d.Seconds())
}
}

qBuf, err := encodeJSON(q)
if err != nil {
return nil, err
}

bBuf, err := io.ReadAll(qBuf)
if err != nil {
return nil, fmt.Errorf("unable to read request body buffer: %w", err)
}

// CAQL extension does not like the JSON in the request body to end with \n.
if strings.HasSuffix(string(bBuf), "\n") {
bBuf = bBuf[:len(bBuf)-1]
}

var r *PromQLResponse

body, _, rErr := sc.DoRequestContext(ctx, node, "POST", u,
bytes.NewBuffer(bBuf), nil)

if body == nil {
return nil, rErr
}

buf, err := io.ReadAll(body)
if err != nil {
return nil, fmt.Errorf("unable to read response body buffer: %w",
err)
}

if rErr != nil {
r = &PromQLResponse{
Status: "error",
ErrorType: "database",
Error: string(buf),
}

var cErr *CAQLError

err := decodeJSON(bytes.NewBuffer(buf), &cErr)
if err != nil {
return nil, fmt.Errorf("unable to decode error response: %w",
err)
}

if cErr != nil && cErr.Message() != "" {
r.ErrorType = "caql"
r.Error = cErr.Message()
}

rErr = &PromQLError{
Status: r.Status,
ErrorType: r.ErrorType,
Err: r.Error,
}
} else {
if err := decodeJSON(bytes.NewBuffer(buf), &r); err != nil {
return nil, fmt.Errorf("unable to decode PromQL response: %w",
err)
}
}

return r, rErr
}

// PromQLRangeQuery evaluates a PromQL query over a range of time.
func (sc *SnowthClient) PromQLRangeQuery(query *PromQLRangeQuery,
nodes ...*SnowthNode,
Expand All @@ -89,7 +244,7 @@ func (sc *SnowthClient) PromQLRangeQueryContext(ctx context.Context,
}

if query == nil {
return nil, fmt.Errorf("invalid PromQL query: null")
return nil, fmt.Errorf("invalid PromQL range query: null")
}

u := "/extension/lua/public/caql_v1"
Expand All @@ -99,8 +254,6 @@ func (sc *SnowthClient) PromQLRangeQueryContext(ctx context.Context,
Format: "PQR",
}

q.Format = "PQR"

if query.AccountID != "" {
i, err := strconv.ParseInt(query.AccountID, 10, 64)
if err != nil {
Expand Down
124 changes: 122 additions & 2 deletions promql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,28 @@ const testPromQLError = `{
"error": "test"
}`

const testPromQLResponse = `{
const testPromQLInstantQueryResponse = `{
"status": "success",
"data": {
"result": [
{
"value": [
[
1676388600,
"3568"
]
],
"metric": {
"__name__": "bytes",
"__check_uuid": "09fc1c4e-8540-49a8-a109-8895553718fc"
}
}
],
"resulttype": "vector"
}
}`

const testPromQLRangeQueryResponse = `{
"status": "success",
"data": {
"result": [
Expand Down Expand Up @@ -49,6 +70,105 @@ const testPromQLResponse = `{
}
}`

func TestPromQLInstantQuery(t *testing.T) {
t.Parallel()

ms := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter,
r *http.Request,
) {
if r.RequestURI == "/state" {
_, _ = w.Write([]byte(stateTestData))

return
}

if r.RequestURI == "/stats.json" {
_, _ = w.Write([]byte(statsTestData))

return
}

if r.Method == "POST" && strings.HasPrefix(r.RequestURI,
"/extension/lua/public/caql_v1") {
b, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(testPromQLError))

return
}

if len(b) == 0 {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(testPromQLError))

return
}

if strings.Contains(string(b), "127") {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(testCAQLError))

return
}

_, _ = w.Write([]byte(testPromQLInstantQueryResponse))

return
}
}))

defer ms.Close()

sc, err := NewClient(context.Background(),
&Config{Servers: []string{ms.URL}})
if err != nil {
t.Fatal("Unable to create snowth client", err)
}

sc.SetRetries(1)
sc.SetConnectRetries(1)

u, err := url.Parse(ms.URL)
if err != nil {
t.Fatal("Invalid test URL")
}

node := &SnowthNode{url: u}

res, err := sc.PromQLInstantQuery(&PromQLInstantQuery{
AccountID: "1",
Query: "test",
Time: "300",
}, node)
if err != nil {
t.Fatal(err)
}

if res.Data == nil {
t.Fatalf("Expected data: 2, got: %v", res.Data)
}

res, err = sc.PromQLInstantQuery(&PromQLInstantQuery{
AccountID: "1",
Query: "test",
Time: "127",
}, node)
if err == nil {
t.Fatal("Expected PromQL error response")
}

if res.ErrorType != "caql" {
t.Errorf("Expected error type: caql, got: %v", res.ErrorType)
}

exp := "Function not found: histograms"

if res.Error != exp {
t.Errorf("Expected error: %v, got: %v", exp, res.Error)
}
}

func TestPromQLRangeQuery(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -91,7 +211,7 @@ func TestPromQLRangeQuery(t *testing.T) {
return
}

_, _ = w.Write([]byte(testPromQLResponse))
_, _ = w.Write([]byte(testPromQLRangeQueryResponse))

return
}
Expand Down

0 comments on commit 54111db

Please sign in to comment.