From b92e72e25eba8e50fe3c26db065c1d9835735a5d Mon Sep 17 00:00:00 2001 From: Jonathan Bennett Date: Fri, 4 Nov 2016 00:24:31 +0200 Subject: [PATCH 01/12] Updating Dockerfile --- lib/api.js | 11 ++++++++--- lib/metrics.js | 0 package.json | 3 ++- 3 files changed, 10 insertions(+), 4 deletions(-) create mode 100644 lib/metrics.js diff --git a/lib/api.js b/lib/api.js index b54095d..935819b 100644 --- a/lib/api.js +++ b/lib/api.js @@ -8,6 +8,10 @@ // var timers = JSON.stringify(targets); console.log(targets); var timers_copy = []; + + // Filter out and prepare the target data for API + // TODO Probably better to do this with a reduce filter in underscore than a loop. + _.each(targets, function(value, key) { var target = { "lastCollected":value.lastCollected, @@ -16,19 +20,20 @@ }; timers_copy.push(target); }); + res.json(timers_copy); }); app.get('/v1/metrics', function(req, res) { - // Metrics should be here + // TODO Metrics should be here var metrics = {}; res.json(metrics); - + }); app.delete('/v1/timer/:id', function(req, res) { - // TODO + // TODO add a delete endpoint to force remove targets? Probably easier just to recommend to restart the container }); app.listen(express_port); diff --git a/lib/metrics.js b/lib/metrics.js new file mode 100644 index 0000000..e69de29 diff --git a/package.json b/package.json index d03dd82..5fdca9a 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,8 @@ "description": "Uses Marathon service discovery API to collect and forward metrics readings from applications on a 'pull' based model. Marathon Metrics Collector makes no assumption about what format the metrics are in, just collects and forwards them to a receiver (in this case Kafka but could be extended to any backend). ", "main": "index.js", "scripts": { - "test": "echo \"Error: no test specified\" && exit 1" + "test": "echo \"Error: no test specified\" && exit 1", + "start": "node index.js" }, "author": "Jonathan Bennett", "license": "MIT", From c461a0654caca7602c4b9793971146e0356905c4 Mon Sep 17 00:00:00 2001 From: Jonathan Bennett Date: Fri, 4 Nov 2016 00:25:06 +0200 Subject: [PATCH 02/12] Added health endpoint --- lib/api.js | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/api.js b/lib/api.js index 935819b..3f0a490 100644 --- a/lib/api.js +++ b/lib/api.js @@ -32,6 +32,14 @@ }); + app.get('/v1/health', function(req, res) { + + // TODO HEALTH should be here + var health = {}; + res.json(health); + + }); + app.delete('/v1/timer/:id', function(req, res) { // TODO add a delete endpoint to force remove targets? Probably easier just to recommend to restart the container }); From 1d8ae1feb7b69842b7e0cb711b17440dfc0a2a1d Mon Sep 17 00:00:00 2001 From: Jonathan Bennett Date: Fri, 4 Nov 2016 00:32:33 +0200 Subject: [PATCH 03/12] Removing a console.log --- index.js | 2 +- lib/api.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/index.js b/index.js index 2cb89b2..3bd48b1 100644 --- a/index.js +++ b/index.js @@ -115,7 +115,7 @@ var send_to_kafka = function(messages) { { topic: kafka_topic, messages: messages } ]; producer.send(payloads, function (err, data) { - console.log(data); + // console.log(data); console.log(err); metric_response = null; }); diff --git a/lib/api.js b/lib/api.js index 3f0a490..5e0839f 100644 --- a/lib/api.js +++ b/lib/api.js @@ -6,7 +6,7 @@ app.get('/v1/timers', function(req, res) { // var timers = JSON.stringify(targets); - console.log(targets); + // console.log(targets); var timers_copy = []; // Filter out and prepare the target data for API From 5787b6a64320e71b7a8651f5edc1e18a94f4643d Mon Sep 17 00:00:00 2001 From: Jonathan Bennett Date: Fri, 4 Nov 2016 00:32:43 +0200 Subject: [PATCH 04/12] Adding informative kafka producer log error --- index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index.js b/index.js index 3bd48b1..fc93807 100644 --- a/index.js +++ b/index.js @@ -116,7 +116,7 @@ var send_to_kafka = function(messages) { ]; producer.send(payloads, function (err, data) { // console.log(data); - console.log(err); + console.log("Kafka producer error", err); metric_response = null; }); } From c70ff681b096e973a5318d39adfaea58edadedb1 Mon Sep 17 00:00:00 2001 From: Jonathan Bennett Date: Fri, 4 Nov 2016 00:36:53 +0200 Subject: [PATCH 05/12] Bug with adding new timers... --- index.js | 1 + 1 file changed, 1 insertion(+) diff --git a/index.js b/index.js index fc93807..49c7a2a 100644 --- a/index.js +++ b/index.js @@ -79,6 +79,7 @@ var refresh_targets = function(callback) { var register_scrapers = function() { // console.log(targets); + targets = null; _.each(targets, function(target) { start_scraping(target); }); From c5d4360625d2ee271b0bb5112e4e396a0efe505b Mon Sep 17 00:00:00 2001 From: Jonathan Bennett Date: Fri, 4 Nov 2016 00:39:41 +0200 Subject: [PATCH 06/12] Excluding repeating timers --- index.js | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/index.js b/index.js index 49c7a2a..007b1e9 100644 --- a/index.js +++ b/index.js @@ -50,19 +50,21 @@ var refresh_targets = function(callback) { if(app.labels.metrics_endpoint) { // console.log(app); _.each(app.tasks, function(task,key) { - var target_def = { - lastCollected: null, - lastResult: null, - target: { - "id":task.id, - "appId":task.appId, - "host":task.host, - "port":app.labels.metrics_endpoint_port || task.ports[0] || null, - "metrics_endpoint":app.labels.metrics_endpoint, - "frequency": app.labels.metrics_frequency || 5 - } - }; - targets.push(target_def); + if(targets, _.indexOf(task.id), function() { + var target_def = { + lastCollected: null, + lastResult: null, + target: { + "id":task.id, + "appId":task.appId, + "host":task.host, + "port":app.labels.metrics_endpoint_port || task.ports[0] || null, + "metrics_endpoint":app.labels.metrics_endpoint, + "frequency": app.labels.metrics_frequency || 5 + } + }; + targets.push(target_def); + }); }); } }); @@ -79,7 +81,6 @@ var refresh_targets = function(callback) { var register_scrapers = function() { // console.log(targets); - targets = null; _.each(targets, function(target) { start_scraping(target); }); From 0c5a00975f1ba8de85ae30747184754f92acc863 Mon Sep 17 00:00:00 2001 From: Jonathan Bennett Date: Fri, 4 Nov 2016 00:42:06 +0200 Subject: [PATCH 07/12] Lazy developing on an iPad... Bad JB --- index.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/index.js b/index.js index 007b1e9..f03c59b 100644 --- a/index.js +++ b/index.js @@ -50,7 +50,7 @@ var refresh_targets = function(callback) { if(app.labels.metrics_endpoint) { // console.log(app); _.each(app.tasks, function(task,key) { - if(targets, _.indexOf(task.id), function() { + if(targets, _.indexOf(task.id) == -1) { var target_def = { lastCollected: null, lastResult: null, @@ -64,7 +64,7 @@ var refresh_targets = function(callback) { } }; targets.push(target_def); - }); + } }); } }); From 32355a93fb617e1fe98211f3413ed60b16475e74 Mon Sep 17 00:00:00 2001 From: Jonathan Bennett Date: Fri, 4 Nov 2016 00:51:21 +0200 Subject: [PATCH 08/12] Fixing error logging --- index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index.js b/index.js index f03c59b..9cf7e16 100644 --- a/index.js +++ b/index.js @@ -118,7 +118,7 @@ var send_to_kafka = function(messages) { ]; producer.send(payloads, function (err, data) { // console.log(data); - console.log("Kafka producer error", err); + if(err) { console.log("Kafka producer error", err); } metric_response = null; }); } From 1f390ce7294c3250c6209ff898291ef91475e655 Mon Sep 17 00:00:00 2001 From: Jonathan Bennett Date: Fri, 4 Nov 2016 00:55:05 +0200 Subject: [PATCH 09/12] Hopefully fixing repeating apps --- index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index.js b/index.js index 9cf7e16..9e6b96a 100644 --- a/index.js +++ b/index.js @@ -50,7 +50,7 @@ var refresh_targets = function(callback) { if(app.labels.metrics_endpoint) { // console.log(app); _.each(app.tasks, function(task,key) { - if(targets, _.indexOf(task.id) == -1) { + if(_.findIndex(targets, {"target":{"id":task.id}}) === -1) { var target_def = { lastCollected: null, lastResult: null, From fca109f54b6622e042453818f1aee316e5f80214 Mon Sep 17 00:00:00 2001 From: Jonathan Bennett Date: Fri, 4 Nov 2016 22:51:53 +0200 Subject: [PATCH 10/12] Cleaned up duplicate timers.. this time with testing of code before pushing it --- index.js | 191 +++++++++++++++++++++++++++++-------------------------- 1 file changed, 101 insertions(+), 90 deletions(-) diff --git a/index.js b/index.js index 9e6b96a..cf5b149 100644 --- a/index.js +++ b/index.js @@ -1,11 +1,11 @@ // External dependencies _ = require('underscore'), -request = require('request'), -kafka = require('kafka-node'); + request = require('request'), + kafka = require('kafka-node'); // Globals -targets = []; +targets = {}; timers = []; // Application dependencies @@ -27,124 +27,135 @@ var HighLevelProducer = kafka.HighLevelProducer, producer = new HighLevelProducer(client); -producer.on('error', function (err) { - console.log("Could not connect to Kafka"); +producer.on('error', function(err) { + console.log("Could not connect to Kafka"); }); -producer.on('ready', function () { - refresh_targets(function() { - setup_marathon_sd_interval(); - setup_clearup_interval(); - }); +producer.on('ready', function() { + refresh_targets(function() { + setup_marathon_sd_interval(); + setup_clearup_interval(); + }); }); // Setup the Marathon connection -var marathon = require('marathon-node')(marathon_url, {"logTime":true}); +var marathon = require('marathon-node')(marathon_url, { + "logTime": true +}); var refresh_targets = function(callback) { - marathon.app - .getList({embed:"apps.tasks",label:"metrics_endpoint"}) - .then(function(data) { - _.each(data.apps, function(app, key) { - if(app.labels.metrics_endpoint) { - // console.log(app); - _.each(app.tasks, function(task,key) { - if(_.findIndex(targets, {"target":{"id":task.id}}) === -1) { - var target_def = { - lastCollected: null, - lastResult: null, - target: { - "id":task.id, - "appId":task.appId, - "host":task.host, - "port":app.labels.metrics_endpoint_port || task.ports[0] || null, - "metrics_endpoint":app.labels.metrics_endpoint, - "frequency": app.labels.metrics_frequency || 5 - } - }; - targets.push(target_def); + marathon.app + .getList({ + embed: "apps.tasks", + label: "metrics_endpoint" + }) + .then(function(data) { + // console.log(data); + _.each(data.apps, function(app, key) { + if (app.labels.metrics_endpoint) { + _.each(app.tasks, function(task, key) { + if (!targets.hasOwnProperty(task.id)) { + var target_def = { + lastCollected: null, + lastResult: null, + target: { + "id": task.id, + "appId": task.appId, + "host": task.host, + "port": app.labels.metrics_endpoint_port || task.ports[0] || null, + "metrics_endpoint": app.labels.metrics_endpoint, + "frequency": app.labels.metrics_frequency || 5 + } + }; + // console.log + targets[task.id] = target_def; + } + }); + }; + }); + register_scrapers(); + if (typeof callback == "function") { + callback(); } - }); - } - }); - register_scrapers(); - if(typeof callback == "function") { - callback(); - } - }) - .catch(function(error) { - // console.log(error); - if(error.name == "RequestError") { console.error("Marathon address not reachable. Maybe incorrect value?"); } - }); + }) + .catch(function(error) { + if (error.name == "RequestError") { + console.error("Marathon address not reachable. Maybe incorrect URL?"); + } + }); } var register_scrapers = function() { - // console.log(targets); - _.each(targets, function(target) { - start_scraping(target); - }); + _.each(targets, function(target) { + start_scraping(target); + }); } var start_scraping = function(targetInstance) { - targetInstance.interval = setInterval(function() { - request('http://'+targetInstance.target.host+':'+targetInstance.target.port+targetInstance.target.metrics_endpoint, function (error, response, body) { - targetInstance.lastCollected = (new Date().getTime()); - targetInstance.lastResult = { - resultCode:response.statusCode, - error:error || null - } - if (!error && response.statusCode == 200) { - var metric_response = { - "id":targetInstance.target.id, - "appId":targetInstance.target.appId, - "metrics":body, - "timestamp":(new Date().getTime()) - } + targetInstance.interval = setInterval(function() { + request('http://' + targetInstance.target.host + ':' + targetInstance.target.port + targetInstance.target.metrics_endpoint, function(error, response, body) { + targetInstance.lastCollected = (new Date().getTime()); + targetInstance.lastResult = { + resultCode: response.statusCode, + error: error || null + } + if (!error && response.statusCode == 200) { + var metric_response = { + "id": targetInstance.target.id, + "appId": targetInstance.target.appId, + "metrics": body, + "timestamp": (new Date().getTime()) + } - var messages = []; - messages.push(JSON.stringify(metric_response)); + var messages = []; + messages.push(JSON.stringify(metric_response)); - send_to_kafka(messages); + send_to_kafka(messages); - } - }); - }, targetInstance.target.frequency * 1000); + } + }); + }, targetInstance.target.frequency * 1000); } var send_to_kafka = function(messages) { - payloads = [ - { topic: kafka_topic, messages: messages } - ]; - producer.send(payloads, function (err, data) { - // console.log(data); - if(err) { console.log("Kafka producer error", err); } - metric_response = null; - }); + payloads = [{ + topic: kafka_topic, + messages: messages + }]; + producer.send(payloads, function(err, data) { + // console.log(data); + if (err) { + console.log("Kafka producer error", err); + } + metric_response = null; + }); } // Loops and timers var setup_marathon_sd_interval = function() { - refresh_interval = setInterval = setInterval(function() { - refresh_targets(); - }, (refresh_frequency * 1000)) + // console.log(refresh_frequency); + refresh_interval = setInterval(function() { + refresh_targets(); + // console.log("Refreshing targets"); + }, (refresh_frequency * 1000)) } // Container removal clean up var setup_clearup_interval = function() { - clearup_interval = setInterval(function() { - cleanup_scrapers(); - }, (clearup_frequency * 1000)); + clearup_interval = setInterval(function() { + cleanup_scrapers(); + }, (clearup_frequency * 1000)); } var cleanup_scrapers = function() { - console.log("Beginning clearup"); - _.each(targets, function(target, key) { - if((target.lastResult != "200" || target.lastResult != "201") && (((new Date().getTime()) + (clearup_timeout * 1000)) > target.lastCollected)) { - // Remove the scraper - clearInterval(target.interval); - // Publish a removal event - } - }); + console.log("Beginning clearup"); + _.each(targets, function(target, key) { + if ((target.lastResult != "200" || target.lastResult != "201") && (((new Date().getTime()) + (clearup_timeout * 1000)) > target.lastCollected)) { + // Remove the scraper + clearInterval(target.interval); + // Publish a removal event + } + }); } From fb8105b5fe692052637d1229a1b77c3e41dc36ab Mon Sep 17 00:00:00 2001 From: Jonathan Bennett Date: Fri, 4 Nov 2016 23:03:36 +0200 Subject: [PATCH 11/12] Removing noisy logs --- index.js | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/index.js b/index.js index cf5b149..9f9d755 100644 --- a/index.js +++ b/index.js @@ -39,9 +39,7 @@ producer.on('ready', function() { }); // Setup the Marathon connection -var marathon = require('marathon-node')(marathon_url, { - "logTime": true -}); +var marathon = require('marathon-node')(marathon_url); var refresh_targets = function(callback) { @@ -150,7 +148,7 @@ var setup_clearup_interval = function() { } var cleanup_scrapers = function() { - console.log("Beginning clearup"); + // console.log("Beginning clearup"); _.each(targets, function(target, key) { if ((target.lastResult != "200" || target.lastResult != "201") && (((new Date().getTime()) + (clearup_timeout * 1000)) > target.lastCollected)) { // Remove the scraper From 4a89ade329fa62a309ac3e3fb4f50095af056553 Mon Sep 17 00:00:00 2001 From: Jonathan Bennett Date: Fri, 4 Nov 2016 23:10:50 +0200 Subject: [PATCH 12/12] Adding metrics endpoint --- index.js | 3 ++- lib/api.js | 3 +-- package.json | 1 + 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/index.js b/index.js index 9f9d755..e74f78a 100644 --- a/index.js +++ b/index.js @@ -1,7 +1,8 @@ // External dependencies _ = require('underscore'), request = require('request'), - kafka = require('kafka-node'); + kafka = require('kafka-node'), + express_node_metrics = require('express-node-metrics').metrics; // Globals diff --git a/lib/api.js b/lib/api.js index 5e0839f..3645ef5 100644 --- a/lib/api.js +++ b/lib/api.js @@ -27,8 +27,7 @@ app.get('/v1/metrics', function(req, res) { // TODO Metrics should be here - var metrics = {}; - res.json(metrics); + res.send(express_node_metrics.getAll(req.query.reset)); }); diff --git a/package.json b/package.json index 5fdca9a..08d90d7 100644 --- a/package.json +++ b/package.json @@ -11,6 +11,7 @@ "license": "MIT", "dependencies": { "express": "4.14.0", + "express-node-metrics": "^1.2.0", "kafka-node": "1.0.3", "marathon-node": "1.0.2", "request": "2.76.0",