Skip to content

Commit

Permalink
URL decode labels in the URL
Browse files Browse the repository at this point in the history
This fixes #27

Signed-off-by: sinkingpoint <[email protected]>
  • Loading branch information
sinkingpoint committed Nov 17, 2023
1 parent a942629 commit 8359816
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 7 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ reqwest = { optional = true, version="0.11.10" }
twox-hash = { optional = true, version = "1.6.3" }
base64 = "0.13"
anyhow = "1.0"
urlencoding = "2.1.3"

[features]
default = ["tls", "auth", "clustering"]
Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ mod clustering;

#[cfg(test)]
mod aggregator_test;
#[cfg(test)]
mod routes_test;
mod auth;

use tokio::signal;
Expand Down
31 changes: 24 additions & 7 deletions src/routes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{collections::HashMap, sync::Arc, convert::Infallible};

use reqwest::StatusCode;
use urlencoding::decode;
use warp::{Filter, http::HeaderValue, hyper::{HeaderMap, body::Bytes}, path::Tail, reject::Reject};

use crate::{aggregator::{AggregationError, Aggregator}, auth::Authenticator};
Expand Down Expand Up @@ -68,7 +69,7 @@ async fn handle_rejection(err: warp::Rejection) -> Result<impl warp::Reply, std:
Some(GravelError::AuthError) => Ok(warp::reply::with_status(String::from("FORBIDDEN"), StatusCode::FORBIDDEN)),
Some(GravelError::AggregationError(err)) => Ok(warp::reply::with_status(err.to_string(), StatusCode::BAD_REQUEST)),
Some(GravelError::Error(err)) => Ok(warp::reply::with_status(err.clone(), StatusCode::BAD_REQUEST)),
None => Ok(warp::reply::with_status(String::from("INTERNAL_SERVER_ERROR"), StatusCode::INTERNAL_SERVER_ERROR)),
None => Ok(warp::reply::with_status(String::new(), StatusCode::NOT_FOUND)),
}
}

Expand Down Expand Up @@ -111,22 +112,33 @@ async fn ingest_metrics<T>(
) -> Result<impl warp::Reply, warp::Rejection> {
let labels = {
let mut labelset = HashMap::new();
let mut labels = url_tail.as_str().split("/").peekable();
let mut labels = url_tail.as_str().split("/").map(|s| decode(s)).peekable();
while labels.peek().is_some() {
let name = labels.next().unwrap();
let label_name = labels.next().unwrap();
let name = match label_name {
Ok(s) => s.into_owned(),
Err(_) => return Err(warp::reject::custom(GravelError::Error("Invalid label name".into())))
};

if name.is_empty() {
break;
}
let value = labels.next().unwrap_or_default();

let value = match labels.next() {
Some(Ok(s)) => s.into_owned(),
Some(Err(_)) => return Err(warp::reject::custom(GravelError::Error("Invalid label value".into()))),
None => return Err(warp::reject::custom(GravelError::Error("Label value missing".into())))
};

labelset.insert(name, value);
}
labelset
};

// We're clustering, so might need to forward the metrics
if let Some(cluster_conf) = conf.cluster_conf.as_ref() {
let job = labels.get("job").unwrap_or(&"");
if let Some(peer) = cluster_conf.get_peer_for_key(job) {
let job = labels.get("job").map(|s| s.to_owned()).unwrap_or(String::new());
if let Some(peer) = cluster_conf.get_peer_for_key(&job) {
if !cluster_conf.is_self(peer) {
match forward_to_peer(peer, data, url_tail).await {
Ok(_) => return Ok(""),
Expand All @@ -143,7 +155,12 @@ async fn ingest_metrics<T>(
}
};

match agg.parse_and_merge(&body, &labels).await {
let mut str_labels = HashMap::new();
for (k, v) in labels.iter() {
str_labels.insert(k.as_str(), v.as_str());
}

match agg.parse_and_merge(&body, &str_labels).await {
Ok(_) => Ok(""),
Err(e) => Err(warp::reject::custom(GravelError::AggregationError(e))),
}
Expand Down
42 changes: 42 additions & 0 deletions src/routes_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::net::SocketAddr;

use crate::{routes::{self, RoutesConfig}, aggregator::Aggregator, auth::pass_through_auth};
use tokio::time::sleep;

#[tokio::test]
async fn test_27() {
// https://github.com/sinkingpoint/prometheus-gravel-gateway/issues/27

let agg = Aggregator::new();
let config = RoutesConfig{
authenticator: Box::new(pass_through_auth()),
#[cfg(feature="clustering")]
cluster_conf: None
};

let routes = routes::get_routes(agg, config);
let server = warp::serve(routes);
let server = tokio::spawn(server.run(SocketAddr::V4("127.0.0.1:4278".parse().unwrap())));

// wait a bit for the server to come up.
sleep(tokio::time::Duration::from_millis(500)).await;

let client = reqwest::Client::new();
let res = client.post("http://127.0.0.1:4278/metrics/job/localhost%3A80").body("test_metric 1
").send().await.unwrap();
assert_eq!(res.status(), 200);

let res = client.get("http://127.0.0.1:4278/metrics").send().await.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.text().await.unwrap(), "test_metric{job=\"localhost:80\"} 1\n");

let res = client.post("http://127.0.0.1:4278/metrics/job/localhost:80").body("test_metric 2
").send().await.unwrap();
assert_eq!(res.status(), 200);

let res = client.get("http://127.0.0.1:4278/metrics").send().await.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.text().await.unwrap(), "test_metric{job=\"localhost:80\"} 3\n");

server.abort();
}

0 comments on commit 8359816

Please sign in to comment.