Skip to content

Commit

Permalink
Merge pull request #2 from JonathanBennett/WIP
Browse files Browse the repository at this point in the history
Merging to master.
  • Loading branch information
JonathanBennett authored Nov 4, 2016
2 parents 007f6b4 + 4a89ade commit 30e41c8
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 99 deletions.
196 changes: 104 additions & 92 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// External dependencies
_ = require('underscore'),
request = require('request'),
kafka = require('kafka-node');
request = require('request'),
kafka = require('kafka-node'),
express_node_metrics = require('express-node-metrics').metrics;


// Globals
targets = [];
targets = {};
timers = [];

// Application dependencies
Expand All @@ -27,122 +28,133 @@ var HighLevelProducer = kafka.HighLevelProducer,
producer = new HighLevelProducer(client);


producer.on('error', function (err) {
console.log("Could not connect to Kafka");
producer.on('error', function(err) {
console.log("Could not connect to Kafka");
});

producer.on('ready', function () {
refresh_targets(function() {
setup_marathon_sd_interval();
setup_clearup_interval();
});
producer.on('ready', function() {
refresh_targets(function() {
setup_marathon_sd_interval();
setup_clearup_interval();
});
});

// Setup the Marathon connection
var marathon = require('marathon-node')(marathon_url, {"logTime":true});
var marathon = require('marathon-node')(marathon_url);


var refresh_targets = function(callback) {
marathon.app
.getList({embed:"apps.tasks",label:"metrics_endpoint"})
.then(function(data) {
_.each(data.apps, function(app, key) {
if(app.labels.metrics_endpoint) {
// console.log(app);
_.each(app.tasks, function(task,key) {
var target_def = {
lastCollected: null,
lastResult: null,
target: {
"id":task.id,
"appId":task.appId,
"host":task.host,
"port":app.labels.metrics_endpoint_port || task.ports[0] || null,
"metrics_endpoint":app.labels.metrics_endpoint,
"frequency": app.labels.metrics_frequency || 5
}
};
targets.push(target_def);
});
}
});
register_scrapers();
if(typeof callback == "function") {
callback();
}
})
.catch(function(error) {
// console.log(error);
if(error.name == "RequestError") { console.error("Marathon address not reachable. Maybe incorrect value?"); }
});
marathon.app
.getList({
embed: "apps.tasks",
label: "metrics_endpoint"
})
.then(function(data) {
// console.log(data);
_.each(data.apps, function(app, key) {
if (app.labels.metrics_endpoint) {
_.each(app.tasks, function(task, key) {
if (!targets.hasOwnProperty(task.id)) {
var target_def = {
lastCollected: null,
lastResult: null,
target: {
"id": task.id,
"appId": task.appId,
"host": task.host,
"port": app.labels.metrics_endpoint_port || task.ports[0] || null,
"metrics_endpoint": app.labels.metrics_endpoint,
"frequency": app.labels.metrics_frequency || 5
}
};
// console.log
targets[task.id] = target_def;
}
});
};
});
register_scrapers();
if (typeof callback == "function") {
callback();
}
})
.catch(function(error) {
if (error.name == "RequestError") {
console.error("Marathon address not reachable. Maybe incorrect URL?");
}
});
}

var register_scrapers = function() {
// console.log(targets);
_.each(targets, function(target) {
start_scraping(target);
});
_.each(targets, function(target) {
start_scraping(target);
});
}

var start_scraping = function(targetInstance) {
targetInstance.interval = setInterval(function() {
request('http://'+targetInstance.target.host+':'+targetInstance.target.port+targetInstance.target.metrics_endpoint, function (error, response, body) {
targetInstance.lastCollected = (new Date().getTime());
targetInstance.lastResult = {
resultCode:response.statusCode,
error:error || null
}
if (!error && response.statusCode == 200) {
var metric_response = {
"id":targetInstance.target.id,
"appId":targetInstance.target.appId,
"metrics":body,
"timestamp":(new Date().getTime())
}

var messages = [];
messages.push(JSON.stringify(metric_response));

send_to_kafka(messages);

}
});
}, targetInstance.target.frequency * 1000);
targetInstance.interval = setInterval(function() {
request('http://' + targetInstance.target.host + ':' + targetInstance.target.port + targetInstance.target.metrics_endpoint, function(error, response, body) {
targetInstance.lastCollected = (new Date().getTime());
targetInstance.lastResult = {
resultCode: response.statusCode,
error: error || null
}
if (!error && response.statusCode == 200) {
var metric_response = {
"id": targetInstance.target.id,
"appId": targetInstance.target.appId,
"metrics": body,
"timestamp": (new Date().getTime())
}

var messages = [];
messages.push(JSON.stringify(metric_response));

send_to_kafka(messages);

}
});
}, targetInstance.target.frequency * 1000);
}

var send_to_kafka = function(messages) {
payloads = [
{ topic: kafka_topic, messages: messages }
];
producer.send(payloads, function (err, data) {
console.log(data);
console.log(err);
metric_response = null;
});
payloads = [{
topic: kafka_topic,
messages: messages
}];
producer.send(payloads, function(err, data) {
// console.log(data);
if (err) {
console.log("Kafka producer error", err);
}
metric_response = null;
});
}

// Loops and timers
var setup_marathon_sd_interval = function() {
refresh_interval = setInterval = setInterval(function() {
refresh_targets();
}, (refresh_frequency * 1000))
// console.log(refresh_frequency);
refresh_interval = setInterval(function() {
refresh_targets();
// console.log("Refreshing targets");
}, (refresh_frequency * 1000))
}

// Container removal clean up

var setup_clearup_interval = function() {
clearup_interval = setInterval(function() {
cleanup_scrapers();
}, (clearup_frequency * 1000));
clearup_interval = setInterval(function() {
cleanup_scrapers();
}, (clearup_frequency * 1000));
}

var cleanup_scrapers = function() {
console.log("Beginning clearup");
_.each(targets, function(target, key) {
if((target.lastResult != "200" || target.lastResult != "201") && (((new Date().getTime()) + (clearup_timeout * 1000)) > target.lastCollected)) {
// Remove the scraper
clearInterval(target.interval);
// Publish a removal event
}
});
// console.log("Beginning clearup");
_.each(targets, function(target, key) {
if ((target.lastResult != "200" || target.lastResult != "201") && (((new Date().getTime()) + (clearup_timeout * 1000)) > target.lastCollected)) {
// Remove the scraper
clearInterval(target.interval);
// Publish a removal event
}
});
}
24 changes: 18 additions & 6 deletions lib/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@

app.get('/v1/timers', function(req, res) {
// var timers = JSON.stringify(targets);
console.log(targets);
// console.log(targets);
var timers_copy = [];

// Filter out and prepare the target data for API
// TODO Probably better to do this with a reduce filter in underscore than a loop.

_.each(targets, function(value, key) {
var target = {
"lastCollected":value.lastCollected,
Expand All @@ -16,19 +20,27 @@
};
timers_copy.push(target);
});

res.json(timers_copy);
});

app.get('/v1/metrics', function(req, res) {

// Metrics should be here
var metrics = {};
res.json(metrics);

// TODO Metrics should be here
res.send(express_node_metrics.getAll(req.query.reset));

});

app.get('/v1/health', function(req, res) {

// TODO HEALTH should be here
var health = {};
res.json(health);

});

app.delete('/v1/timer/:id', function(req, res) {
// TODO
// TODO add a delete endpoint to force remove targets? Probably easier just to recommend to restart the container
});

app.listen(express_port);
Empty file added lib/metrics.js
Empty file.
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
"description": "Uses Marathon service discovery API to collect and forward metrics readings from applications on a 'pull' based model. Marathon Metrics Collector makes no assumption about what format the metrics are in, just collects and forwards them to a receiver (in this case Kafka but could be extended to any backend). ",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
"test": "echo \"Error: no test specified\" && exit 1",
"start": "node index.js"
},
"author": "Jonathan Bennett",
"license": "MIT",
"dependencies": {
"express": "4.14.0",
"express-node-metrics": "^1.2.0",
"kafka-node": "1.0.3",
"marathon-node": "1.0.2",
"request": "2.76.0",
Expand Down

0 comments on commit 30e41c8

Please sign in to comment.