diff --git a/Cargo.lock b/Cargo.lock index f727d2c..688d412 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -480,6 +480,7 @@ dependencies = [ "trust-dns-proto", "trust-dns-resolver", "twox-hash", + "urlencoding", "warp", ] @@ -1921,6 +1922,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf-8" version = "0.7.6" diff --git a/Cargo.toml b/Cargo.toml index c0f63eb..7868306 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ reqwest = { optional = true, version="0.11.10" } twox-hash = { optional = true, version = "1.6.3" } base64 = "0.13" anyhow = "1.0" +urlencoding = "2.1.3" [features] default = ["tls", "auth", "clustering"] diff --git a/src/main.rs b/src/main.rs index 35f00b6..ed752dd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,6 +15,8 @@ mod clustering; #[cfg(test)] mod aggregator_test; +#[cfg(test)] +mod routes_test; mod auth; use tokio::signal; diff --git a/src/routes.rs b/src/routes.rs index c0724af..af4183a 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, sync::Arc, convert::Infallible}; use reqwest::StatusCode; +use urlencoding::decode; use warp::{Filter, http::HeaderValue, hyper::{HeaderMap, body::Bytes}, path::Tail, reject::Reject}; use crate::{aggregator::{AggregationError, Aggregator}, auth::Authenticator}; @@ -68,7 +69,7 @@ async fn handle_rejection(err: warp::Rejection) -> Result Ok(warp::reply::with_status(String::from("FORBIDDEN"), StatusCode::FORBIDDEN)), Some(GravelError::AggregationError(err)) => Ok(warp::reply::with_status(err.to_string(), StatusCode::BAD_REQUEST)), Some(GravelError::Error(err)) => Ok(warp::reply::with_status(err.clone(), StatusCode::BAD_REQUEST)), - None => Ok(warp::reply::with_status(String::from("INTERNAL_SERVER_ERROR"), StatusCode::INTERNAL_SERVER_ERROR)), + None => Ok(warp::reply::with_status(String::new(), StatusCode::NOT_FOUND)), } } @@ -111,13 +112,24 @@ async fn ingest_metrics( ) -> Result { let labels = { let mut labelset = HashMap::new(); - let mut labels = url_tail.as_str().split("/").peekable(); + let mut labels = url_tail.as_str().split("/").map(|s| decode(s)).peekable(); while labels.peek().is_some() { - let name = labels.next().unwrap(); + let label_name = labels.next().unwrap(); + let name = match label_name { + Ok(s) => s.into_owned(), + Err(_) => return Err(warp::reject::custom(GravelError::Error("Invalid label name".into()))) + }; + if name.is_empty() { break; } - let value = labels.next().unwrap_or_default(); + + let value = match labels.next() { + Some(Ok(s)) => s.into_owned(), + Some(Err(_)) => return Err(warp::reject::custom(GravelError::Error("Invalid label value".into()))), + None => return Err(warp::reject::custom(GravelError::Error("Label value missing".into()))) + }; + labelset.insert(name, value); } labelset @@ -125,8 +137,8 @@ async fn ingest_metrics( // We're clustering, so might need to forward the metrics if let Some(cluster_conf) = conf.cluster_conf.as_ref() { - let job = labels.get("job").unwrap_or(&""); - if let Some(peer) = cluster_conf.get_peer_for_key(job) { + let job = labels.get("job").map(|s| s.to_owned()).unwrap_or(String::new()); + if let Some(peer) = cluster_conf.get_peer_for_key(&job) { if !cluster_conf.is_self(peer) { match forward_to_peer(peer, data, url_tail).await { Ok(_) => return Ok(""), @@ -143,7 +155,12 @@ async fn ingest_metrics( } }; - match agg.parse_and_merge(&body, &labels).await { + let mut str_labels = HashMap::new(); + for (k, v) in labels.iter() { + str_labels.insert(k.as_str(), v.as_str()); + } + + match agg.parse_and_merge(&body, &str_labels).await { Ok(_) => Ok(""), Err(e) => Err(warp::reject::custom(GravelError::AggregationError(e))), } diff --git a/src/routes_test.rs b/src/routes_test.rs new file mode 100644 index 0000000..fa61120 --- /dev/null +++ b/src/routes_test.rs @@ -0,0 +1,42 @@ +use std::net::SocketAddr; + +use crate::{routes::{self, RoutesConfig}, aggregator::Aggregator, auth::pass_through_auth}; +use tokio::time::sleep; + +#[tokio::test] +async fn test_27() { + // https://github.com/sinkingpoint/prometheus-gravel-gateway/issues/27 + + let agg = Aggregator::new(); + let config = RoutesConfig{ + authenticator: Box::new(pass_through_auth()), + #[cfg(feature="clustering")] + cluster_conf: None + }; + + let routes = routes::get_routes(agg, config); + let server = warp::serve(routes); + let server = tokio::spawn(server.run(SocketAddr::V4("127.0.0.1:4278".parse().unwrap()))); + + // wait a bit for the server to come up. + sleep(tokio::time::Duration::from_millis(500)).await; + + let client = reqwest::Client::new(); + let res = client.post("http://127.0.0.1:4278/metrics/job/localhost%3A80").body("test_metric 1 +").send().await.unwrap(); + assert_eq!(res.status(), 200); + + let res = client.get("http://127.0.0.1:4278/metrics").send().await.unwrap(); + assert_eq!(res.status(), 200); + assert_eq!(res.text().await.unwrap(), "test_metric{job=\"localhost:80\"} 1\n"); + + let res = client.post("http://127.0.0.1:4278/metrics/job/localhost:80").body("test_metric 2 +").send().await.unwrap(); + assert_eq!(res.status(), 200); + + let res = client.get("http://127.0.0.1:4278/metrics").send().await.unwrap(); + assert_eq!(res.status(), 200); + assert_eq!(res.text().await.unwrap(), "test_metric{job=\"localhost:80\"} 3\n"); + + server.abort(); +} \ No newline at end of file