aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/node
diff options
context:
space:
mode:
authorGravatar murgatroid99 <mlumish@google.com>2015-11-24 17:21:40 -0800
committerGravatar murgatroid99 <mlumish@google.com>2015-11-24 17:21:40 -0800
commitfb40b81cdacddaf772abf2aef61f248913300762 (patch)
tree3aeabe11228a79f6c39f483cf11b0cef18ecca7a /src/node
parent0592f72c4f39e6e4b4b0bef1323316ee97231322 (diff)
Added most of what's needed for QPS benchmark measurement
Diffstat (limited to 'src/node')
-rw-r--r--src/node/performance/benchmark_client.js287
-rw-r--r--src/node/performance/benchmark_server.js162
-rw-r--r--src/node/performance/histogram.js178
-rw-r--r--src/node/performance/worker_server.js60
-rw-r--r--src/node/performance/worker_service_impl.js114
5 files changed, 801 insertions, 0 deletions
diff --git a/src/node/performance/benchmark_client.js b/src/node/performance/benchmark_client.js
new file mode 100644
index 0000000000..cd14679126
--- /dev/null
+++ b/src/node/performance/benchmark_client.js
@@ -0,0 +1,287 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/**
+ * Benchmark client module
+ * @module
+ */
+
+'use strict';
+
+var fs = require('fs');
+var path = require('path');
+var _ = require('lodash');
+var PoissonProcess = require('poisson-process');
+var Histogram = require('./histogram');
+var grpc = require('../../../');
+var serviceProto = grpc.load({
+ root: __dirname + '/../../..',
+ file: 'test/proto/benchmarks/services.proto'}).grpc.testing;
+
+/**
+ * Create a buffer filled with size zeroes
+ * @param {number} size The length of the buffer
+ * @return {Buffer} The new buffer
+ */
+function zeroBuffer(size) {
+ var zeros = new Buffer(size);
+ zeros.fill(0);
+ return zeros;
+}
+
+/**
+ * The BenchmarkClient class. Opens channels to servers and makes RPCs based on
+ * parameters from the driver, and records statistics about those RPCs.
+ * @param {Array.<string>} server_targets List of servers to connect to
+ * @param {number} channels The total number of channels to open
+ * @param {Object} histogram_params Options for setting up the histogram
+ * @param {Object=} security_params Options for TLS setup. If absent, don't use
+ * TLS
+ */
+function BenchmarkClient(server_targets, channels, histogram_params,
+ security_params) {
+ var options = {};
+ var creds;
+ if (security_params) {
+ var ca_path;
+ if (security_params.use_test_ca) {
+ ca_path = path.join(__dirname, '../test/data/ca.pem');
+ var ca_data = fs.readFileSync(ca_path);
+ creds = grpc.credentials.createSsl(ca_data);
+ } else {
+ creds = grpc.credentials.createSsl();
+ }
+ if (security_params.server_host_override) {
+ var host_override = security_params.server_host_override;
+ options['grpc.ssl_target_name_override'] = host_override;
+ options['grpc.default_authority'] = host_override;
+ }
+ } else {
+ creds = grpc.credentials.createInsecure();
+ }
+
+ this.clients = [];
+
+ for (var i = 0; i < channels; i++) {
+ this.clients[i] = new serviceProto.BenchmarkService(
+ server_targets[i % server_targets.length], creds, options);
+ }
+
+ this.histogram = new Histogram(histogram_params.resolution,
+ histogram_params.max_possible);
+
+ this.running = false;
+};
+
+/**
+ * Start a closed-loop test. For each channel, start
+ * outstanding_rpcs_per_channel RPCs. Then, whenever an RPC finishes, start
+ * another one.
+ * @param {number} outstanding_rpcs_per_channel Number of RPCs to start per
+ * channel
+ * @param {string} rpc_type Which method to call. Should be 'UNARY' or
+ * 'STREAMING'
+ * @param {number} req_size The size of the payload to send with each request
+ * @param {number} resp_size The size of payload to request be sent in responses
+ */
+BenchmarkClient.prototype.startClosedLoop = function(
+ outstanding_rpcs_per_channel, rpc_type, req_size, resp_size) {
+ var self = this;
+
+ self.running = true;
+
+ self.last_wall_time = process.hrtime();
+
+ var makeCall;
+
+ var argument = {
+ response_size: resp_size,
+ payload: {
+ body: zeroBuffer(req_size)
+ }
+ };
+
+ if (rpc_type == 'UNARY') {
+ makeCall = function(client) {
+ if (self.running) {
+ var start_time = process.hrtime();
+ client.unaryCall(argument, function(error, response) {
+ // Ignoring error for now
+ var time_diff = process.hrtime(start_time);
+ self.histogram.add(time_diff);
+ makeCall(client);
+ });
+ }
+ };
+ } else {
+ makeCall = function(client) {
+ if (self.running) {
+ var start_time = process.hrtime();
+ var call = client.streamingCall();
+ call.write(argument);
+ call.on('data', function() {
+ });
+ call.on('end', function() {
+ // Ignoring error for now
+ var time_diff = process.hrtime(start_time);
+ self.histogram.add(time_diff);
+ makeCall(client);
+ });
+ }
+ };
+ }
+
+ _.each(self.clients, function(client) {
+ _.times(outstanding_rpcs_per_channel, function() {
+ makeCall(client);
+ });
+ });
+};
+
+/**
+ * Start a poisson test. For each channel, this initiates a number of Poisson
+ * processes equal to outstanding_rpcs_per_channel, where each Poisson process
+ * has the load parameter offered_load.
+ * @param {number} outstanding_rpcs_per_channel Number of RPCs to start per
+ * channel
+ * @param {string} rpc_type Which method to call. Should be 'UNARY' or
+ * 'STREAMING'
+ * @param {number} req_size The size of the payload to send with each request
+ * @param {number} resp_size The size of payload to request be sent in responses
+ * @param {number} offered_load The load parameter for the Poisson process
+ */
+BenchmarkClient.prototype.startPoisson = function(
+ outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, offered_load) {
+ var self = this;
+
+ self.running = true;
+
+ self.last_wall_time = process.hrtime();
+
+ var makeCall;
+
+ var argument = {
+ response_size: resp_size,
+ payload: {
+ body: zeroBuffer(req_size)
+ }
+ };
+
+ if (rpc_type == 'UNARY') {
+ makeCall = function(client, poisson) {
+ if (self.running) {
+ var start_time = process.hrtime();
+ client.unaryCall(argument, function(error, response) {
+ // Ignoring error for now
+ var time_diff = process.hrtime(start_time);
+ self.histogram.add(time_diff);
+ });
+ } else {
+ poisson.stop();
+ }
+ };
+ } else {
+ makeCall = function(client, poisson) {
+ if (self.running) {
+ var start_time = process.hrtime();
+ var call = client.streamingCall();
+ call.write(argument);
+ call.on('data', function() {
+ });
+ call.on('end', function() {
+ // Ignoring error for now
+ var time_diff = process.hrtime(start_time);
+ self.histogram.add(time_diff);
+ });
+ } else {
+ poisson.stop();
+ }
+ };
+ }
+
+ var averageIntervalMs = (1 / offered_load) * 1000;
+
+ _.each(self.clients, function(client) {
+ _.times(outstanding_rpcs_per_channel, function() {
+ var p = PoissonProcess.create(averageIntervalMs, function() {
+ makeCall(client, p);
+ });
+ p.start();
+ });
+ });
+};
+
+/**
+ * Return curent statistics for the client. If reset is set, restart
+ * statistic collection.
+ * @param {boolean} reset Indicates that statistics should be reset
+ * @return {object} Client statistics
+ */
+BenchmarkClient.prototype.mark = function(reset) {
+ var wall_time_diff = process.hrtime(this.last_wall_time);
+ var histogram = this.histogram;
+ if (reset) {
+ this.last_wall_time = process.hrtime();
+ this.histogram = new Histogram(histogram.resolution,
+ histogram.max_possible);
+ }
+
+ return {
+ latencies: {
+ bucket: histogram.getContents(),
+ min_seen: histogram.minimum(),
+ max_seen: histogram.maximum(),
+ sum: histogram.getSum(),
+ sum_of_squares: histogram.sumOfSquares(),
+ count: histogram.getCount()
+ },
+ time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9,
+ // Not sure how to measure these values
+ time_user: 0,
+ time_system: 0
+ };
+};
+
+/**
+ * Stop the clients.
+ * @param {function} callback Called when the clients have finished shutting
+ * down
+ */
+BenchmarkClient.prototype.stop = function(callback) {
+ this.running = false;
+ /* TODO(murgatroid99): Figure out how to check that the clients have finished
+ * before calling this */
+ callback();
+};
+
+module.exports = BenchmarkClient;
diff --git a/src/node/performance/benchmark_server.js b/src/node/performance/benchmark_server.js
new file mode 100644
index 0000000000..a94433afc4
--- /dev/null
+++ b/src/node/performance/benchmark_server.js
@@ -0,0 +1,162 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/**
+ * Benchmark server module
+ * @module
+ */
+
+'use strict';
+
+var fs = require('fs');
+var path = require('path');
+
+var grpc = require('../../../');
+var serviceProto = grpc.load({
+ root: __dirname + '/../../..',
+ file: 'test/proto/benchmarks/services.proto'}).grpc.testing;
+
+/**
+ * Create a buffer filled with size zeroes
+ * @param {number} size The length of the buffer
+ * @return {Buffer} The new buffer
+ */
+function zeroBuffer(size) {
+ var zeros = new Buffer(size);
+ zeros.fill(0);
+ return zeros;
+}
+
+/**
+ * Handler for the unary benchmark method. Simply responds with a payload
+ * containing the requested number of zero bytes.
+ * @param {Call} call The call object to be handled
+ * @param {function} callback The callback to call with the response
+ */
+function unaryCall(call, callback) {
+ var req = call.request;
+ var payload = {body: zeroBuffer(req.response_size)};
+ callback(null, {payload: payload});
+}
+
+/**
+ * Handler for the streaming benchmark method. Simply responds to each request
+ * with a payload containing the requested number of zero bytes.
+ * @param {Call} call The call object to be handled
+ */
+function streamingCall(call) {
+ call.on('data', function(value) {
+ var payload = {body: zeroBuffer(value.repsonse_size)};
+ call.write({payload: payload});
+ });
+ call.on('end', function() {
+ call.end();
+ });
+}
+
+/**
+ * BenchmarkServer class. Constructed based on parameters from the driver and
+ * stores statistics.
+ * @param {string} host The host to serve on
+ * @param {number} port The port to listen to
+ * @param {tls} Indicates whether TLS should be used
+ */
+function BenchmarkServer(host, port, tls) {
+ var server_creds;
+ var host_override;
+ if (tls) {
+ var key_path = path.join(__dirname, '../test/data/server1.key');
+ var pem_path = path.join(__dirname, '../test/data/server1.pem');
+
+ var key_data = fs.readFileSync(key_path);
+ var pem_data = fs.readFileSync(pem_path);
+ server_creds = grpc.ServerCredentials.createSsl(null,
+ [{private_key: key_data,
+ cert_chain: pem_data}]);
+ } else {
+ server_creds = grpc.ServerCredentials.createInsecure();
+ }
+
+ var server = new Server();
+ this.port = server.bind(host + ':' + port, server_creds);
+ server.addProtoService(serviceProto.BenchmarkService.service, {
+ unaryCall: unaryCall,
+ streamingCall: streamingCall
+ });
+ this.server = server;
+}
+
+/**
+ * Start the benchmark server.
+ */
+BenchmarkServer.prototype.start = function() {
+ this.server.start();
+ this.last_wall_time = process.hrtime();
+};
+
+/**
+ * Return the port number that the server is bound to.
+ * @return {Number} The port number
+ */
+BenchmarkServer.prototype.getPort = function() {
+ return this.port;
+};
+
+/**
+ * Return current statistics for the server. If reset is set, restart
+ * statistic collection.
+ * @param {boolean} reset Indicates that statistics should be reset
+ * @return {object} Server statistics
+ */
+BenchmarkServer.prototype.mark = function(reset) {
+ var wall_time_diff = process.hrtime(this.last_wall_time);
+ if (reset) {
+ this.last_wall_time = process.hrtime();
+ }
+ return {
+ time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9,
+ // Not sure how to measure these values
+ time_user: 0,
+ time_system: 0
+ };
+};
+
+/**
+ * Stop the server.
+ * @param {function} callback Called when the server has finished shutting down
+ */
+BenchmarkServer.prototype.stop = function(callback) {
+ this.server.tryShutdown(callback);
+};
+
+module.exports = BenchmarkServer;
diff --git a/src/node/performance/histogram.js b/src/node/performance/histogram.js
new file mode 100644
index 0000000000..f769266ae2
--- /dev/null
+++ b/src/node/performance/histogram.js
@@ -0,0 +1,178 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/**
+ * Histogram module. Exports the Histogram class
+ * @module
+ */
+
+'use strict';
+
+/**
+ * Histogram class. Collects data and exposes a histogram and other statistics.
+ * This data structure is taken directly from src/core/support/histogram.c, but
+ * pared down to the statistics needed for client stats in
+ * test/proto/benchmarks/stats.proto.
+ * @constructor
+ * @param {number} resolution The histogram's bucket resolution
+ * @param {number} max_possible The maximum allowed value
+ */
+function Histogram(resolution, max_possible) {
+ this.resolution = resolution;
+ this.max_possible = max_possible;
+
+ this.sum = 0;
+ this.sum_of_squares = 0;
+ this.multiplier = 1 + resolution;
+ this.count = 0;
+ this.min_seen = max_possible;
+ this.max_seen = 0;
+ this.buckets = [];
+ for (var i = 0; i < this.bucketFor(max_possible) + 1; i++) {
+ this.buckets[i] = 0;
+ }
+}
+
+/**
+ * Get the bucket index for a given value.
+ * @param {number} value The value to check
+ * @return {number} The bucket index
+ */
+Histogram.prototype.bucketFor = function(value) {
+ return Math.floor(Math.log(value) / Math.log(this.multiplier));
+};
+
+/**
+ * Get the minimum value for a given bucket index
+ * @param {number} The bucket index to check
+ * @return {number} The minimum value for that bucket
+ */
+Histogram.prototype.bucketStart = function(index) {
+ return Math.pow(this.multiplier, index);
+};
+
+/**
+ * Add a value to the histogram. This updates all statistics with the new
+ * value. Those statistics should not be modified except with this function
+ * @param {number} value The value to add
+ */
+Histogram.prototype.add = function(value) {
+ this.sum += value;
+ this.sum_of_squares += value * value;
+ this.count++;
+ if (value < this.min_seen) {
+ this.min_seen = value;
+ }
+ if (value > this.max_seen) {
+ this.max_seen = value;
+ }
+ this.buckets[this.bucketFor(value)]++;
+};
+
+/**
+ * Get the mean of all added values
+ * @return {number} The mean
+ */
+Histogram.prototype.mean = function() {
+ return this.sum / this.count;
+};
+
+/**
+ * Get the variance of all added values. Used to calulate the standard deviation
+ * @return {number} The variance
+ */
+Histogram.prototype.variance = function() {
+ if (this.count == 0) {
+ return 0;
+ }
+ return (this.sum_of_squares * this.count - this.sum * this.sum) /
+ (this.count * this.count);
+};
+
+/**
+ * Get the standard deviation of all added values
+ * @return {number} The standard deviation
+ */
+Histogram.prototype.stddev = function() {
+ return Math.sqrt(this.variance);
+};
+
+/**
+ * Get the maximum among all added values
+ * @return {number} The maximum
+ */
+Histogram.prototype.maximum = function() {
+ return this.max_seen;
+};
+
+/**
+ * Get the minimum among all added values
+ * @return {number} The minimum
+ */
+Histogram.prototype.minimum = function() {
+ return this.min_seen;
+};
+
+/**
+ * Get the number of all added values
+ * @return {number} The count
+ */
+Histogram.prototype.getCount = function() {
+ return this.count;
+};
+
+/**
+ * Get the sum of all added values
+ * @return {number} The sum
+ */
+Histogram.prototype.getSum = function() {
+ return this.sum;
+};
+
+/**
+ * Get the sum of squares of all added values
+ * @return {number} The sum of squares
+ */
+Histogram.prototype.sumOfSquares = function() {
+ return this.sum_of_squares;
+};
+
+/**
+ * Get the raw histogram as a list of bucket sizes
+ * @return {Array.<number>} The buckets
+ */
+Histogram.prototype.getContents = function() {
+ return this.buckets;
+};
+
+module.exports = Histogram;
diff --git a/src/node/performance/worker_server.js b/src/node/performance/worker_server.js
new file mode 100644
index 0000000000..b7e638ac87
--- /dev/null
+++ b/src/node/performance/worker_server.js
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+'use strict';
+
+var worker_service_impl = require('./worker_service_impl');
+
+var grpc = require('../../../');
+var serviceProto = grpc.load({
+ root: __dirname + '/../../..',
+ file: 'test/proto/benchmarks/services.proto'}).grpc.testing;
+
+function runServer(port) {
+ var server_creds;
+ // Need to actually populate server_creds
+ var server = new grpc.Server();
+ server.addProtoService(serviceProto.WorkerService.service,
+ worker_service_impl);
+ server.bind('0.0.0.0:' + port, server_creds);
+ server.start();
+ return server;
+}
+
+if (require.main === module) {
+ var parseArgs = require('minimist');
+ var argv = parseArgs(process.argv, {
+ string: ['driver_port']
+ });
+ runServer(argv.driver_port);
+}
diff --git a/src/node/performance/worker_service_impl.js b/src/node/performance/worker_service_impl.js
new file mode 100644
index 0000000000..c81ae15bd2
--- /dev/null
+++ b/src/node/performance/worker_service_impl.js
@@ -0,0 +1,114 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+'use strict';
+
+var BenchmarkClient = require('./benchmark_client');
+var BenchmarkServer = require('./benchmark_server');
+
+exports.runClient = function runClient(call) {
+ var client;
+ call.on('data', function(request) {
+ switch (request.argtype) {
+ case 'setup':
+ var setup = request.setup;
+ client = new BenchmarkClient(setup.server_targets,
+ setup.client_channels,
+ setup.security_params,
+ setup.histogram_params);
+ switch (setup.load_params.load) {
+ case 'closed_loop':
+ client.startClosedLoop(setup.outstanding_rpcs_per_channel,
+ setup.rpc_type, setup.payload_config.req_size,
+ setup.payload_config.resp_size);
+ break;
+ case 'poisson':
+ client.startPoisson(setup.outstanding_rpcs_per_channel,
+ setup.rpc_type, setup.payload_config.req_size,
+ setup.payload_config.resp_size,
+ setup.load_params.poisson.offered_load);
+ break;
+ default:
+ call.emit('error', new Error('Unsupported LoadParams type' +
+ setup.load_params.load));
+ }
+ case 'mark':
+ if (client) {
+ var stats = client.mark(request.mark.reset);
+ call.write({
+ stats: stats
+ });
+ } else {
+ call.emit('error', new Error('Got Mark before ClientConfig'));
+ }
+ default:
+ throw new Error('Nonexistent client argtype option');
+ }
+ });
+ call.on('end', function() {
+ // TODO(murgatroid99): Ensure client is shutdown before calling call.end
+ client.stop();
+ call.end();
+ });
+};
+
+exports.runServer = function runServer(call) {
+ var server;
+ call.on('data', function(request) {
+ switch (request.argtype) {
+ case 'setup':
+ server = new BenchmarkServer(request.setup.host, request.setup.port,
+ request.setup.security_params);
+ server.start();
+ break;
+ case 'mark':
+ if (server) {
+ var stats = server.mark(request.mark.reset);
+ call.write({
+ stats: stats,
+ port: server.getPort()
+ });
+ } else {
+ call.emit('error', new Error('Got Mark befor ServerConfig'));
+ }
+ break;
+ default:
+ throw new Error('Nonexistent server argtype option');
+ }
+ });
+ call.on('end', function() {
+ server.stop(function() {
+ call.end();
+ });
+ });
+};