diff options
author | murgatroid99 <mlumish@google.com> | 2015-12-01 16:37:46 -0800 |
---|---|---|
committer | murgatroid99 <mlumish@google.com> | 2015-12-01 16:37:46 -0800 |
commit | 722f91038277520a6c564f4adb504552f7e2b328 (patch) | |
tree | d39de1c6d8ff07308dfedb6410cf515137b161eb /src/node | |
parent | fb40b81cdacddaf772abf2aef61f248913300762 (diff) |
Fixed some bugs in node benchmark service
Diffstat (limited to 'src/node')
-rw-r--r-- | src/node/performance/benchmark_client.js | 32 | ||||
-rw-r--r-- | src/node/performance/benchmark_server.js | 2 | ||||
-rw-r--r-- | src/node/performance/histogram.js | 4 | ||||
-rw-r--r-- | src/node/performance/worker_server.js | 9 | ||||
-rw-r--r-- | src/node/performance/worker_service_impl.js | 32 |
5 files changed, 59 insertions, 20 deletions
diff --git a/src/node/performance/benchmark_client.js b/src/node/performance/benchmark_client.js index cd14679126..cc5fe21ebd 100644 --- a/src/node/performance/benchmark_client.js +++ b/src/node/performance/benchmark_client.js @@ -40,6 +40,8 @@ var fs = require('fs'); var path = require('path'); +var util = require('util'); +var EventEmitter = require('events'); var _ = require('lodash'); var PoissonProcess = require('poisson-process'); var Histogram = require('./histogram'); @@ -101,8 +103,12 @@ function BenchmarkClient(server_targets, channels, histogram_params, histogram_params.max_possible); this.running = false; + + this.pending_calls = 0; }; +util.inherits(BenchmarkClient, EventEmitter); + /** * Start a closed-loop test. For each channel, start * outstanding_rpcs_per_channel RPCs. Then, whenever an RPC finishes, start @@ -134,28 +140,37 @@ BenchmarkClient.prototype.startClosedLoop = function( if (rpc_type == 'UNARY') { makeCall = function(client) { if (self.running) { + self.pending_calls++; var start_time = process.hrtime(); client.unaryCall(argument, function(error, response) { // Ignoring error for now var time_diff = process.hrtime(start_time); self.histogram.add(time_diff); makeCall(client); + self.pending_calls--; + if ((!self.running) && self.pending_calls == 0) { + self.emit('finished'); + } }); } }; } else { makeCall = function(client) { if (self.running) { + self.pending_calls++; var start_time = process.hrtime(); var call = client.streamingCall(); call.write(argument); call.on('data', function() { }); call.on('end', function() { - // Ignoring error for now var time_diff = process.hrtime(start_time); self.histogram.add(time_diff); makeCall(client); + self.pending_calls--; + if ((!self.running) && self.pending_calls == 0) { + self.emit('finished'); + } }); } }; @@ -200,11 +215,16 @@ BenchmarkClient.prototype.startPoisson = function( if (rpc_type == 'UNARY') { makeCall = function(client, poisson) { if (self.running) { + self.pending_calls++; var start_time = process.hrtime(); client.unaryCall(argument, function(error, response) { // Ignoring error for now var time_diff = process.hrtime(start_time); self.histogram.add(time_diff); + self.pending_calls--; + if ((!self.running) && self.pending_calls == 0) { + self.emit('finished'); + } }); } else { poisson.stop(); @@ -213,15 +233,19 @@ BenchmarkClient.prototype.startPoisson = function( } else { makeCall = function(client, poisson) { if (self.running) { + self.pending_calls++; var start_time = process.hrtime(); var call = client.streamingCall(); call.write(argument); call.on('data', function() { }); call.on('end', function() { - // Ignoring error for now var time_diff = process.hrtime(start_time); self.histogram.add(time_diff); + self.pending_calls--; + if ((!self.running) && self.pending_calls == 0) { + self.emit('finished'); + } }); } else { poisson.stop(); @@ -279,9 +303,7 @@ BenchmarkClient.prototype.mark = function(reset) { */ BenchmarkClient.prototype.stop = function(callback) { this.running = false; - /* TODO(murgatroid99): Figure out how to check that the clients have finished - * before calling this */ - callback(); + self.on('finished', callback); }; module.exports = BenchmarkClient; diff --git a/src/node/performance/benchmark_server.js b/src/node/performance/benchmark_server.js index a94433afc4..ac96fc5edb 100644 --- a/src/node/performance/benchmark_server.js +++ b/src/node/performance/benchmark_server.js @@ -107,7 +107,7 @@ function BenchmarkServer(host, port, tls) { server_creds = grpc.ServerCredentials.createInsecure(); } - var server = new Server(); + var server = new grpc.Server(); this.port = server.bind(host + ':' + port, server_creds); server.addProtoService(serviceProto.BenchmarkService.service, { unaryCall: unaryCall, diff --git a/src/node/performance/histogram.js b/src/node/performance/histogram.js index f769266ae2..45e1c23a90 100644 --- a/src/node/performance/histogram.js +++ b/src/node/performance/histogram.js @@ -44,8 +44,8 @@ * pared down to the statistics needed for client stats in * test/proto/benchmarks/stats.proto. * @constructor - * @param {number} resolution The histogram's bucket resolution - * @param {number} max_possible The maximum allowed value + * @param {number} resolution The histogram's bucket resolution. Must be positive + * @param {number} max_possible The maximum allowed value. Must be greater than 1 */ function Histogram(resolution, max_possible) { this.resolution = resolution; diff --git a/src/node/performance/worker_server.js b/src/node/performance/worker_server.js index b7e638ac87..43b86e5ecf 100644 --- a/src/node/performance/worker_server.js +++ b/src/node/performance/worker_server.js @@ -41,20 +41,23 @@ var serviceProto = grpc.load({ file: 'test/proto/benchmarks/services.proto'}).grpc.testing; function runServer(port) { - var server_creds; - // Need to actually populate server_creds + var server_creds = grpc.ServerCredentials.createInsecure(); var server = new grpc.Server(); server.addProtoService(serviceProto.WorkerService.service, worker_service_impl); - server.bind('0.0.0.0:' + port, server_creds); + var address = '0.0.0.0:' + port; + server.bind(address, server_creds); server.start(); return server; } if (require.main === module) { + Error.stackTraceLimit = Infinity; var parseArgs = require('minimist'); var argv = parseArgs(process.argv, { string: ['driver_port'] }); runServer(argv.driver_port); } + +exports.runServer = runServer; diff --git a/src/node/performance/worker_service_impl.js b/src/node/performance/worker_service_impl.js index c81ae15bd2..6de6e7fca5 100644 --- a/src/node/performance/worker_service_impl.js +++ b/src/node/performance/worker_service_impl.js @@ -39,18 +39,20 @@ var BenchmarkServer = require('./benchmark_server'); exports.runClient = function runClient(call) { var client; call.on('data', function(request) { + var stats; switch (request.argtype) { case 'setup': var setup = request.setup; client = new BenchmarkClient(setup.server_targets, setup.client_channels, - setup.security_params, - setup.histogram_params); + setup.histogram_params, + setup.security_params); switch (setup.load_params.load) { case 'closed_loop': client.startClosedLoop(setup.outstanding_rpcs_per_channel, - setup.rpc_type, setup.payload_config.req_size, - setup.payload_config.resp_size); + setup.rpc_type, + setup.payload_config.simple_params.req_size, + setup.payload_config.simple_params.resp_size); break; case 'poisson': client.startPoisson(setup.outstanding_rpcs_per_channel, @@ -62,9 +64,15 @@ exports.runClient = function runClient(call) { call.emit('error', new Error('Unsupported LoadParams type' + setup.load_params.load)); } + stats = client.mark(); + console.log(stats); + call.write({ + stats: stats + }); + break; case 'mark': if (client) { - var stats = client.mark(request.mark.reset); + stats = client.mark(request.mark.reset); call.write({ stats: stats }); @@ -76,24 +84,30 @@ exports.runClient = function runClient(call) { } }); call.on('end', function() { - // TODO(murgatroid99): Ensure client is shutdown before calling call.end - client.stop(); - call.end(); + client.stop(function() { + call.end(); + }); }); }; exports.runServer = function runServer(call) { var server; call.on('data', function(request) { + var stats; switch (request.argtype) { case 'setup': server = new BenchmarkServer(request.setup.host, request.setup.port, request.setup.security_params); server.start(); + stats = server.mark(); + call.write({ + stats: stats, + port: server.getPort() + }); break; case 'mark': if (server) { - var stats = server.mark(request.mark.reset); + stats = server.mark(request.mark.reset); call.write({ stats: stats, port: server.getPort() |