Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Add configuration to enable/disable cascading feature and set cascadi…
Browse files Browse the repository at this point in the history
…ng logs to debug (#1311)
  • Loading branch information
qwu16 authored Feb 24, 2023
1 parent 467901c commit 1246a33
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 127 deletions.
26 changes: 25 additions & 1 deletion doc/design/quic-cascading.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,37 @@ node .

This is just a sample to show how cascading clusters control work, please customize your own scheduling and secured deployment in real practice.

### ManagementAPI configuration

management_api module will report restful server info to cluster manager and then report to cloud control, so that cloud control could communicate with cluster management_api for restful API, please configure following items in cascading item in ```management_api/management_api.toml```:

````
[cascading]
enabled: set true to enable cascading feature, disable cascading by default
servicename: service name used to register to cascading cloud for accessing to communicate with this cluster
````

### ClusterManager configuration

The OWT cluster will report cluster info to cloud control service through cluster_manager module in OWT, you need to configure following items in cascading item in cluster_manager/cluster_manager.toml file before starting it:
The OWT cluster will report cluster info to cloud control service through cluster_manager module in OWT, you need to configure following items in cascading item in ```cluster_manager/cluster_manager.toml``` file before starting it:

````
[cascading]
enabled: set true to enable cascading feature, disable cascading by default
url: specify the cloud control url, so that cluster manager module can connect to the url and send cluster info
region: specify the region this OWT cluster locate, this will be used by cloud control service to schedule incoming clients by region
clusterID: specify a unique cluster ID for this cluster in the cloud.
````

### Conference configuration

conference events will be sent to cascaded clusters through ```conference_agent``` and eventbridge, configure following item in cascading item in ```conference_agent/agent.toml``` to enable event cascading:

````
[cascading]
enabled: set true to enable cascading feature, disable cascading by default
````


### Media/event bridge

Expand Down
3 changes: 3 additions & 0 deletions source/agent/conference/agent.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,6 @@ dataBaseURL = "localhost/owtdb" #default: "localhost/owtdb"
[internal]
# tcp/sctp available, tcp is default
protocol = "tcp"

[cascading]
enabled = false # disable cascading feature by default
151 changes: 82 additions & 69 deletions source/agent/conference/conference.js

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions source/cluster_manager/clusterManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ var ClusterManager = function (clusterName, selfId, spec) {

function validateUrl(url) {
try {
new Url.URL(url);
return true;
if (spec.enableCascading) {
new Url.URL(url);
return true;
}
return false;
} catch {
return false;
}
Expand Down
3 changes: 2 additions & 1 deletion source/cluster_manager/cluster_manager.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ analytics = "least-used"
eventbridge = "last-used"
mediabridge = "last-used"

[cloud]
[cascading]
enabled = false #disable cascading feature by default
url = "none" #default none:not connect to cascading cloud, specify cloud service url for the cluster to register to cloud
region = "BJ"
clusterID = "" #A unique cluster ID reporting to cascading cloud
9 changes: 5 additions & 4 deletions source/cluster_manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ config.rabbit = config.rabbit || {};
config.rabbit.host = config.rabbit.host || 'localhost';
config.rabbit.port = config.rabbit.port || 5672;

config.cloud.url = config.cloud.url;
config.cloud.region = config.cloud.region;
config.cloud.clusterID = config.cloud.clusterID;
config.cascading.enabled = config.cascading.enabled || false;
config.cascading.url = config.cascading.url;
config.cascading.region = config.cascading.region;
config.cascading.clusterID = config.cascading.clusterID;

function startup () {
var id = Math.floor(Math.random() * 1000000000);
Expand All @@ -54,7 +55,7 @@ function startup () {
checkAliveCount: config.manager.check_alive_count,
scheduleKeepTime: config.manager.schedule_reserve_time,
strategy: config.strategy,
url: config.cloud.url, region: config.cloud.region, clusterID: config.cloud.clusterID
enableCascading: config.cascading.enabled, url: config.cascading.url, region: config.cascading.region, clusterID: config.cascading.clusterID
};

if (config.manager.enable_grpc) {
Expand Down
105 changes: 56 additions & 49 deletions source/management_api/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ var serverConfig = global.config.server || {};
var cluster = require("cluster");
var serverPort = serverConfig.port || 3000;
var numCPUs = serverConfig.numberOfProcess || 1;
var servicename = serverConfig.servicename || 'sampleService';


var cascadingConfig = global.config.cascading || {};
var enableCascading = cascadingConfig.enabled || false;
var servicename = cascadingConfig.servicename || 'sampleService';

var ip_address;
(function getPublicIP() {
Expand Down Expand Up @@ -179,62 +183,65 @@ if (cluster.isMaster) {
var dataAccess = require('./data_access');
dataAccess.token.genKey();

var registerInfo = undefined;

amqper.connect(global.config.rabbit, function () {
amqper.asRpcClient(function(rpcCli) {
var keepTrying = true;
var trySendInfo = function(attempts) {
if (attempts <= 0) {
log.info("Send register info to cluster manager timeout");
return
}
log.info("Send restful info to cluster manager:", registerInfo);
rpcCli.remoteCall(cluster_name, 'registerInfo', [registerInfo], { callback : function(result) {
if (result === 'timeout') {
if (keepTrying) {
log.info('Faild to register restful server info, keep trying.');
setTimeout(function() { trySendInfo(attempts - (result === 'timeout' ? 4 : 1)); }, 1000);
}
} else {
log.info('Send register info to cluster manager succeed');
keepTrying = false;
if (enableCascading) {
var registerInfo = undefined;

amqper.connect(global.config.rabbit, function () {
amqper.asRpcClient(function(rpcCli) {
var keepTrying = true;
var trySendInfo = function(attempts) {
if (attempts <= 0) {
log.info("Send register info to cluster manager timeout");
return
}
} }, 1000);
}
log.info("Send restful info to cluster manager:", registerInfo);
rpcCli.remoteCall(cluster_name, 'registerInfo', [registerInfo], { callback : function(result) {
if (result === 'timeout') {
if (keepTrying) {
log.info('Faild to register restful server info, keep trying.');
setTimeout(function() { trySendInfo(attempts - (result === 'timeout' ? 4 : 1)); }, 1000);
}
} else {
log.info('Send register info to cluster manager succeed');
keepTrying = false;
}
} }, 1000);
}

if (registerInfo != undefined) {
trySendInfo(5);
} else {
setTimeout(function() { trySendInfo(5); }, 1000);
}
if (registerInfo != undefined) {
trySendInfo(5);
} else {
setTimeout(function() { trySendInfo(5); }, 1000);
}
}, function(reason) {
log.error('Initializing as rpc client failed, reason:', reason);
process.exit();
});
}, function(reason) {
log.error('Initializing as rpc client failed, reason:', reason);
log.error('Connect to rabbitMQ server failed, reason:', reason);
process.exit();
});
}, function(reason) {
log.error('Connect to rabbitMQ server failed, reason:', reason);
process.exit();
});
});

dataAccess.service.list(function (err, sers) {
if (err) {
log.warn('Failed to get service:', err.message);
} else {
var serviceToCloud = sers.filter((t) => {return t.name === servicename;});
log.info('Representing service ', serviceToCloud);
var key = serviceToCloud[0].key;
if (serviceToCloud[0].encrypted === true) {
key = cipher.decrypt(cipher.k, key);
}
dataAccess.service.list(function (err, sers) {
if (err) {
log.warn('Failed to get service:', err.message);
} else {
var serviceToCloud = sers.filter((t) => {return t.name === servicename;});
log.info('Representing service ', serviceToCloud);
var key = serviceToCloud[0].key;
if (serviceToCloud[0].encrypted === true) {
key = cipher.decrypt(cipher.k, key);
}

registerInfo = {
resturl: url,
servicekey: key,
serviceid: serviceToCloud[0]._id
registerInfo = {
resturl: url,
servicekey: key,
serviceid: serviceToCloud[0]._id
}
}
}
});
});
}

for (var i = 0; i < numCPUs; i++) {
cluster.fork();
Expand Down
5 changes: 4 additions & 1 deletion source/management_api/management_api.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ numberOfProcess = 4 #default: 1
enableWebTransport = false #default: false.
hostname = "" #default: ""
ip_address = "" #default: ""
servicename = "sampleService" #service name used to register to cascading cloud for accessing to communicate with this cluster
#enable_grpc = true

[cluster]
Expand All @@ -21,3 +20,7 @@ port = 5672 #default: 5672

[mongo]
dataBaseURL = "localhost/owtdb" #default: "localhost/owtdb"

[cascading]
enabled = false # disable cascading feature by default
servicename = "sampleService" #service name used to register to cascading cloud for accessing to communicate with this cluster

0 comments on commit 1246a33

Please sign in to comment.