aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/node
diff options
context:
space:
mode:
authorGravatar murgatroid99 <mlumish@google.com>2016-10-18 09:55:28 -0700
committerGravatar murgatroid99 <mlumish@google.com>2016-10-18 09:55:28 -0700
commitb53e5d1f2b3a20299a2b65afffed6958a77ebfb6 (patch)
tree9fc19f0d786a7a90e3ce99d38c3ad54ddec2fb58 /src/node
parent58628ae7a70feff295fc7f44de60686439a2beca (diff)
Create benchmark client and server for Node Express
Diffstat (limited to 'src/node')
-rw-r--r--src/node/performance/benchmark_client_express.js289
-rw-r--r--src/node/performance/benchmark_server.js5
-rw-r--r--src/node/performance/benchmark_server_express.js107
-rw-r--r--src/node/performance/worker.js10
-rw-r--r--src/node/performance/worker_service_impl.js228
5 files changed, 530 insertions, 109 deletions
diff --git a/src/node/performance/benchmark_client_express.js b/src/node/performance/benchmark_client_express.js
new file mode 100644
index 0000000000..15bc1132d2
--- /dev/null
+++ b/src/node/performance/benchmark_client_express.js
@@ -0,0 +1,289 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/**
+ * Benchmark client module
+ * @module
+ */
+
+'use strict';
+
+var fs = require('fs');
+var path = require('path');
+var util = require('util');
+var EventEmitter = require('events');
+var http = require('http');
+var https = require('https');
+
+var async = require('async');
+var _ = require('lodash');
+var PoissonProcess = require('poisson-process');
+var Histogram = require('./histogram');
+
+/**
+ * Convert a time difference, as returned by process.hrtime, to a number of
+ * nanoseconds.
+ * @param {Array.<number>} time_diff The time diff, represented as
+ * [seconds, nanoseconds]
+ * @return {number} The total number of nanoseconds
+ */
+function timeDiffToNanos(time_diff) {
+ return time_diff[0] * 1e9 + time_diff[1];
+}
+
+function BenchmarkClient(server_targets, channels, histogram_params,
+ security_params) {
+ var options = {
+ method: 'PUT',
+ headers: {
+ 'Content-Type': 'application/json'
+ }
+ };
+ var protocol;
+ if (security_params) {
+ var ca_path;
+ protocol = https;
+ this.request = _.bind(https.request, https);
+ if (security_params.use_test_ca) {
+ ca_path = path.join(__dirname, '../test/data/ca.pem');
+ var ca_data = fs.readFileSync(ca_path);
+ options.ca = ca_data;
+ }
+ if (security_params.server_host_override) {
+ var host_override = security_params.server_host_override;
+ options.servername = host_override;
+ }
+ } else {
+ protocol = http;
+ }
+
+ this.request = _.bind(protocol.request, protocol);
+
+ this.client_options = [];
+
+ for (var i = 0; i < channels; i++) {
+ var new_options = _.assign({host: server_targets[i]}, options);
+ new_options.agent = new protocol.Agent(new_options);
+ this.client_options[i] = new_options;
+ }
+
+ this.histogram = new Histogram(histogram_params.resolution,
+ histogram_params.max_possible);
+
+ this.running = false;
+
+ this.pending_calls = 0;
+}
+
+util.inherits(BenchmarkClient, EventEmitter);
+
+function startAllClients(client_options_list, outstanding_rpcs_per_channel,
+ makeCall, emitter) {
+ _.each(client_options_list, function(client_options) {
+ _.times(outstanding_rpcs_per_channel, function() {
+ makeCall(client_options);
+ });
+ });
+}
+
+BenchmarkClient.prototype.startClosedLoop = function(
+ outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, generic) {
+ var self = this;
+
+ var options = {};
+
+ self.running = true;
+
+ if (rpc_type == 'UNARY') {
+ options.path = '/serviceProto.BenchmarkService.service/unaryCall';
+ } else {
+ self.emit('error', new Error('Unsupported rpc_type: ' + rpc_type));
+ }
+
+ if (generic) {
+ self.emit('error', new Error('Generic client not supported'));
+ }
+
+ self.last_wall_time = process.hrtime();
+
+ var argument = {
+ response_size: resp_size,
+ payload: {
+ body: '0'.repeat(req_size)
+ }
+ };
+
+ function makeCall(client_options) {
+ if (self.running) {
+ self.pending_calls++;
+ var start_time = process.hrtime();
+ var req = self.request(client_options, function(res) {
+ var res_data = '';
+ res.on('data', function(data) {
+ res_data += data;
+ });
+ res.on('end', function() {
+ JSON.parse(res_data);
+ var time_diff = process.hrtime(start_time);
+ self.histogram.add(timeDiffToNanos(time_diff));
+ makeCall(client_options);
+ self.pending_calls--;
+ if ((!self.running) && self.pending_calls == 0) {
+ self.emit('finished');
+ }
+ });
+ });
+ req.write(JSON.stringify(argument));
+ req.end();
+ req.on('error', function(error) {
+ self.emit('error', new Error('Client error: ' + error.message));
+ self.running = false;
+ });
+ }
+ }
+
+ startAllClients(_.assign(options, self.client_options),
+ outstanding_rpcs_per_channel, makeCall, self);
+};
+
+BenchmarkClient.prototype.startPoisson = function(
+ outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, offered_load,
+ generic) {
+ var self = this;
+
+ var options = {};
+
+ self.running = true;
+
+ if (rpc_type == 'UNARY') {
+ options.path = '/serviceProto.BenchmarkService.service/unaryCall';
+ } else {
+ self.emit('error', new Error('Unsupported rpc_type: ' + rpc_type));
+ }
+
+ if (generic) {
+ self.emit('error', new Error('Generic client not supported'));
+ }
+
+ self.last_wall_time = process.hrtime();
+
+ var argument = {
+ response_size: resp_size,
+ payload: {
+ body: '0'.repeat(req_size)
+ }
+ };
+
+ function makeCall(client_options, poisson) {
+ if (self.running) {
+ self.pending_calls++;
+ var start_time = process.hrtime();
+ var req = self.request(client_options, function(res) {
+ var res_data = '';
+ res.on('data', function(data) {
+ res_data += data;
+ });
+ res.on('end', function() {
+ JSON.parse(res_data);
+ var time_diff = process.hrtime(start_time);
+ self.histogram.add(timeDiffToNanos(time_diff));
+ self.pending_calls--;
+ if ((!self.running) && self.pending_calls == 0) {
+ self.emit('finished');
+ }
+ });
+ });
+ req.write(JSON.stringify(argument));
+ req.end();
+ req.on('error', function(error) {
+ self.emit('error', new Error('Client error: ' + error.message));
+ self.running = false;
+ });
+ } else {
+ poisson.stop();
+ }
+ }
+
+ var averageIntervalMs = (1 / offered_load) * 1000;
+
+ startAllClients(_.assign(options, self.client_options),
+ outstanding_rpcs_per_channel, function(opts){
+ var p = PoissonProcess.create(averageIntervalMs, function() {
+ makeCall(opts, p);
+ });
+ p.start();
+ }, self);
+};
+
+/**
+ * Return curent statistics for the client. If reset is set, restart
+ * statistic collection.
+ * @param {boolean} reset Indicates that statistics should be reset
+ * @return {object} Client statistics
+ */
+BenchmarkClient.prototype.mark = function(reset) {
+ var wall_time_diff = process.hrtime(this.last_wall_time);
+ var histogram = this.histogram;
+ if (reset) {
+ this.last_wall_time = process.hrtime();
+ this.histogram = new Histogram(histogram.resolution,
+ histogram.max_possible);
+ }
+
+ return {
+ latencies: {
+ bucket: histogram.getContents(),
+ min_seen: histogram.minimum(),
+ max_seen: histogram.maximum(),
+ sum: histogram.getSum(),
+ sum_of_squares: histogram.sumOfSquares(),
+ count: histogram.getCount()
+ },
+ time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9,
+ // Not sure how to measure these values
+ time_user: 0,
+ time_system: 0
+ };
+};
+
+/**
+ * Stop the clients.
+ * @param {function} callback Called when the clients have finished shutting
+ * down
+ */
+BenchmarkClient.prototype.stop = function(callback) {
+ this.running = false;
+ this.on('finished', callback);
+};
+
+module.exports = BenchmarkClient;
diff --git a/src/node/performance/benchmark_server.js b/src/node/performance/benchmark_server.js
index 70cee9979b..6abde2e17a 100644
--- a/src/node/performance/benchmark_server.js
+++ b/src/node/performance/benchmark_server.js
@@ -40,6 +40,8 @@
var fs = require('fs');
var path = require('path');
+var EventEmitter = require('events');
+var util = require('util');
var genericService = require('./generic_service');
@@ -138,12 +140,15 @@ function BenchmarkServer(host, port, tls, generic, response_size) {
this.server = server;
}
+util.inherits(BenchmarkServer, EventEmitter);
+
/**
* Start the benchmark server.
*/
BenchmarkServer.prototype.start = function() {
this.server.start();
this.last_wall_time = process.hrtime();
+ this.emit('started');
};
/**
diff --git a/src/node/performance/benchmark_server_express.js b/src/node/performance/benchmark_server_express.js
new file mode 100644
index 0000000000..07a559f022
--- /dev/null
+++ b/src/node/performance/benchmark_server_express.js
@@ -0,0 +1,107 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/**
+ * Benchmark server module
+ * @module
+ */
+
+'use strict';
+
+var fs = require('fs');
+var path = require('path');
+var http = require('http');
+var https = require('https');
+var EventEmitter = require('events');
+var util = require('util');
+
+var express = require('express');
+
+function unaryCall(req, res) {
+ var reqObj = JSON.parse(req.body);
+ var payload = {body: '0'.repeat(reqObj.response_size)};
+ res.send(JSON.dumps(payload));
+}
+
+function BenchmarkServer(host, port, tls, generic, response_size) {
+ var app = express();
+ app.put('/serviceProto.BenchmarkService.service/unaryCall', unaryCall);
+ this.input_host = host;
+ this.input_port = port;
+ if (tls) {
+ var credentials = {};
+ var key_path = path.join(__dirname, '../test/data/server1.key');
+ var pem_path = path.join(__dirname, '../test/data/server1.pem');
+
+ var key_data = fs.readFileSync(key_path);
+ var pem_data = fs.readFileSync(pem_path);
+ credentials['key'] = key_data;
+ credentials['cert'] = pem_data;
+ this.server = https.createServer(credentials, app);
+ } else {
+ this.server = http.createServer(app);
+ }
+}
+
+util.inherits(BenchmarkServer, EventEmitter);
+
+BenchmarkServer.prototype.start = function() {
+ var self = this;
+ this.server.listen(this.input_port, this.input_hostname, function() {
+ this.last_wall_time = process.hrtime();
+ self.emit('started');
+ });
+};
+
+BenchmarkServer.prototype.getPort = function() {
+ return this.server.address().port;
+};
+
+BenchmarkServer.prototype.mark = function(reset) {
+ var wall_time_diff = process.hrtime(this.last_wall_time);
+ if (reset) {
+ this.last_wall_time = process.hrtime();
+ }
+ return {
+ time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9,
+ // Not sure how to measure these values
+ time_user: 0,
+ time_system: 0
+ };
+};
+
+BenchmarkServer.prototype.stop = function(callback) {
+ this.server.close(callback);
+};
+
+module.exports = BenchmarkServer;
diff --git a/src/node/performance/worker.js b/src/node/performance/worker.js
index 7ef9b84fe7..030bf7d7ba 100644
--- a/src/node/performance/worker.js
+++ b/src/node/performance/worker.js
@@ -34,18 +34,18 @@
'use strict';
var console = require('console');
-var worker_service_impl = require('./worker_service_impl');
+var WorkerServiceImpl = require('./worker_service_impl');
var grpc = require('../../../');
var serviceProto = grpc.load({
root: __dirname + '/../../..',
file: 'src/proto/grpc/testing/services.proto'}).grpc.testing;
-function runServer(port) {
+function runServer(port, benchmark_impl) {
var server_creds = grpc.ServerCredentials.createInsecure();
var server = new grpc.Server();
server.addProtoService(serviceProto.WorkerService.service,
- worker_service_impl);
+ new WorkerServiceImpl(benchmark_impl, server));
var address = '0.0.0.0:' + port;
server.bind(address, server_creds);
server.start();
@@ -57,9 +57,9 @@ if (require.main === module) {
Error.stackTraceLimit = Infinity;
var parseArgs = require('minimist');
var argv = parseArgs(process.argv, {
- string: ['driver_port']
+ string: ['driver_port', 'benchmark_impl']
});
- runServer(argv.driver_port);
+ runServer(argv.driver_port, argv.benchmark_impl);
}
exports.runServer = runServer;
diff --git a/src/node/performance/worker_service_impl.js b/src/node/performance/worker_service_impl.js
index 4b5cb8f9c2..73dcb7afad 100644
--- a/src/node/performance/worker_service_impl.js
+++ b/src/node/performance/worker_service_impl.js
@@ -38,121 +38,141 @@ var console = require('console');
var BenchmarkClient = require('./benchmark_client');
var BenchmarkServer = require('./benchmark_server');
-exports.quitWorker = function quitWorker(call, callback) {
- callback(null, {});
- process.exit(0);
-}
+module.exports = function WorkerServiceImpl(benchmark_impl, server) {
+ var BenchmarkClient;
+ var BenchmarkServer;
+ switch (benchmark_impl) {
+ case 'grpc':
+ BenchmarkClient = require('./benchmark_client');
+ BenchmarkServer = require('./benchmark_server');
+ break;
+ case 'express':
+ BenchmarkClient = require('./benchmark_client_express');
+ BenchmarkServer = require('./benchmark_server_express');
+ break;
+ default:
+ throw new Error('Unrecognized benchmark impl: ' + benchmark_impl);
+ }
-exports.runClient = function runClient(call) {
- var client;
- call.on('data', function(request) {
- var stats;
- switch (request.argtype) {
- case 'setup':
- var setup = request.setup;
- console.log('ClientConfig %j', setup);
- client = new BenchmarkClient(setup.server_targets,
- setup.client_channels,
- setup.histogram_params,
- setup.security_params);
- client.on('error', function(error) {
- call.emit('error', error);
- });
- var req_size, resp_size, generic;
- switch (setup.payload_config.payload) {
- case 'bytebuf_params':
- req_size = setup.payload_config.bytebuf_params.req_size;
- resp_size = setup.payload_config.bytebuf_params.resp_size;
- generic = true;
+ this.quitWorker = function quitWorker(call, callback) {
+ server.tryShutdown(function() {
+ callback(null, {});
+ });
+ };
+
+ this.runClient = function runClient(call) {
+ var client;
+ call.on('data', function(request) {
+ var stats;
+ switch (request.argtype) {
+ case 'setup':
+ var setup = request.setup;
+ console.log('ClientConfig %j', setup);
+ client = new BenchmarkClient(setup.server_targets,
+ setup.client_channels,
+ setup.histogram_params,
+ setup.security_params);
+ client.on('error', function(error) {
+ call.emit('error', error);
+ });
+ var req_size, resp_size, generic;
+ switch (setup.payload_config.payload) {
+ case 'bytebuf_params':
+ req_size = setup.payload_config.bytebuf_params.req_size;
+ resp_size = setup.payload_config.bytebuf_params.resp_size;
+ generic = true;
+ break;
+ case 'simple_params':
+ req_size = setup.payload_config.simple_params.req_size;
+ resp_size = setup.payload_config.simple_params.resp_size;
+ generic = false;
+ break;
+ default:
+ call.emit('error', new Error('Unsupported PayloadConfig type' +
+ setup.payload_config.payload));
+ }
+ switch (setup.load_params.load) {
+ case 'closed_loop':
+ client.startClosedLoop(setup.outstanding_rpcs_per_channel,
+ setup.rpc_type, req_size, resp_size, generic);
+ break;
+ case 'poisson':
+ client.startPoisson(setup.outstanding_rpcs_per_channel,
+ setup.rpc_type, req_size, resp_size,
+ setup.load_params.poisson.offered_load, generic);
+ break;
+ default:
+ call.emit('error', new Error('Unsupported LoadParams type' +
+ setup.load_params.load));
+ }
+ stats = client.mark();
+ call.write({
+ stats: stats
+ });
break;
- case 'simple_params':
- req_size = setup.payload_config.simple_params.req_size;
- resp_size = setup.payload_config.simple_params.resp_size;
- generic = false;
+ case 'mark':
+ if (client) {
+ stats = client.mark(request.mark.reset);
+ call.write({
+ stats: stats
+ });
+ } else {
+ call.emit('error', new Error('Got Mark before ClientConfig'));
+ }
break;
default:
- call.emit('error', new Error('Unsupported PayloadConfig type' +
- setup.payload_config.payload));
+ throw new Error('Nonexistent client argtype option: ' + request.argtype);
}
- switch (setup.load_params.load) {
- case 'closed_loop':
- client.startClosedLoop(setup.outstanding_rpcs_per_channel,
- setup.rpc_type, req_size, resp_size, generic);
+ });
+ call.on('end', function() {
+ client.stop(function() {
+ call.end();
+ });
+ });
+ };
+
+ this.runServer = function runServer(call) {
+ var server;
+ call.on('data', function(request) {
+ var stats;
+ switch (request.argtype) {
+ case 'setup':
+ console.log('ServerConfig %j', request.setup);
+ server = new BenchmarkServer('[::]', request.setup.port,
+ request.setup.security_params);
+ server.start();
+ server.on('started', function() {
+ stats = server.mark();
+ call.write({
+ stats: stats,
+ port: server.getPort()
+ });
+ });
break;
- case 'poisson':
- client.startPoisson(setup.outstanding_rpcs_per_channel,
- setup.rpc_type, req_size, resp_size,
- setup.load_params.poisson.offered_load, generic);
+ case 'mark':
+ if (server) {
+ stats = server.mark(request.mark.reset);
+ call.write({
+ stats: stats,
+ port: server.getPort(),
+ cores: 1
+ });
+ } else {
+ call.emit('error', new Error('Got Mark before ServerConfig'));
+ }
break;
default:
- call.emit('error', new Error('Unsupported LoadParams type' +
- setup.load_params.load));
+ throw new Error('Nonexistent server argtype option');
}
- stats = client.mark();
- call.write({
- stats: stats
- });
- break;
- case 'mark':
- if (client) {
- stats = client.mark(request.mark.reset);
- call.write({
- stats: stats
- });
- } else {
- call.emit('error', new Error('Got Mark before ClientConfig'));
- }
- break;
- default:
- throw new Error('Nonexistent client argtype option: ' + request.argtype);
- }
- });
- call.on('end', function() {
- client.stop(function() {
- call.end();
});
- });
-};
-
-exports.runServer = function runServer(call) {
- var server;
- call.on('data', function(request) {
- var stats;
- switch (request.argtype) {
- case 'setup':
- console.log('ServerConfig %j', request.setup);
- server = new BenchmarkServer('[::]', request.setup.port,
- request.setup.security_params);
- server.start();
- stats = server.mark();
- call.write({
- stats: stats,
- port: server.getPort()
+ call.on('end', function() {
+ server.stop(function() {
+ call.end();
});
- break;
- case 'mark':
- if (server) {
- stats = server.mark(request.mark.reset);
- call.write({
- stats: stats,
- port: server.getPort(),
- cores: 1
- });
- } else {
- call.emit('error', new Error('Got Mark before ServerConfig'));
- }
- break;
- default:
- throw new Error('Nonexistent server argtype option');
- }
- });
- call.on('end', function() {
- server.stop(function() {
- call.end();
});
- });
-};
+ };
-exports.coreCount = function coreCount(call, callback) {
- callback(null, {cores: os.cpus().length});
+ this.coreCount = function coreCount(call, callback) {
+ callback(null, {cores: os.cpus().length});
+ };
};