aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/node
diff options
context:
space:
mode:
Diffstat (limited to 'src/node')
-rw-r--r--src/node/README.md18
-rw-r--r--src/node/ext/call_credentials.cc81
-rw-r--r--src/node/ext/call_credentials.h18
-rw-r--r--src/node/performance/benchmark_client.js60
-rw-r--r--src/node/performance/benchmark_server.js34
-rw-r--r--src/node/performance/generic_service.js46
-rw-r--r--src/node/performance/worker_service_impl.js25
-rw-r--r--src/node/src/credentials.js2
-rw-r--r--src/node/src/server.js6
-rw-r--r--src/node/test/surface_test.js8
10 files changed, 225 insertions, 73 deletions
diff --git a/src/node/README.md b/src/node/README.md
index 3501b54a66..15d4c6d02f 100644
--- a/src/node/README.md
+++ b/src/node/README.md
@@ -7,6 +7,8 @@ Beta
## PREREQUISITES
- `node`: This requires `node` to be installed, version `0.12` or above. If you instead have the `nodejs` executable on Debian, you should install the [`nodejs-legacy`](https://packages.debian.org/sid/nodejs-legacy) package.
+- **Note:** If you installed `node` via a package manager and the version is still less than `0.12`, try directly installing it from [nodejs.org](https://nodejs.org).
+
## INSTALLATION
Install the gRPC NPM package
@@ -17,7 +19,21 @@ npm install grpc
## BUILD FROM SOURCE
1. Clone [the grpc Git Repository](https://github.com/grpc/grpc).
- 3. Run `npm install`.
+ 2. Run `npm install` from the repository root.
+
+ - **Note:** On Windows, this might fail due to [nodejs issue #4932](https://github.com/nodejs/node/issues/4932) in which case, you will see something like the following in `npm install`'s output (towards the very beginning):
+
+ ```
+ ..
+ Building the projects in this solution one at a time. To enable parallel build, please add the "/m" switch.
+ WINDOWS_BUILD_WARNING
+ "..\IMPORTANT: Due to https:\github.com\nodejs\node\issues\4932, to build this library on Windows, you must first remove C:\Users\jenkins\.node-gyp\4.4.0\include\node\openssl"
+ ...
+ ..
+ ```
+
+ To fix this, you will have to delete the folder `C:\Users\<username>\.node-gyp\<node_version>\include\node\openssl` and retry `npm install`
+
## TESTING
To run the test suite, simply run `npm test` in the install location.
diff --git a/src/node/ext/call_credentials.cc b/src/node/ext/call_credentials.cc
index 98696db232..bd2d146bbc 100644
--- a/src/node/ext/call_credentials.cc
+++ b/src/node/ext/call_credentials.cc
@@ -35,6 +35,8 @@
#include <nan.h>
#include <uv.h>
+#include <list>
+
#include "grpc/grpc.h"
#include "grpc/grpc_security.h"
#include "grpc/support/log.h"
@@ -161,6 +163,15 @@ NAN_METHOD(CallCredentials::CreateFromPlugin) {
grpc_metadata_credentials_plugin plugin;
plugin_state *state = new plugin_state;
state->callback = new Nan::Callback(info[0].As<Function>());
+ state->pending_callbacks = new std::list<plugin_callback_data*>();
+ uv_mutex_init(&state->plugin_mutex);
+ uv_async_init(uv_default_loop(),
+ &state->plugin_async,
+ SendPluginCallback);
+ uv_unref((uv_handle_t*)&state->plugin_async);
+
+ state->plugin_async.data = state;
+
plugin.get_metadata = plugin_get_metadata;
plugin.destroy = plugin_destroy_state;
plugin.state = reinterpret_cast<void*>(state);
@@ -208,48 +219,60 @@ NAN_METHOD(PluginCallback) {
NAUV_WORK_CB(SendPluginCallback) {
Nan::HandleScope scope;
- plugin_callback_data *data = reinterpret_cast<plugin_callback_data*>(
- async->data);
- // Attach cb and user_data to plugin_callback so that it can access them later
- v8::Local<v8::Function> plugin_callback = Nan::GetFunction(
- Nan::New<v8::FunctionTemplate>(PluginCallback)).ToLocalChecked();
- Nan::Set(plugin_callback, Nan::New("cb").ToLocalChecked(),
- Nan::New<v8::External>(reinterpret_cast<void*>(data->cb)));
- Nan::Set(plugin_callback, Nan::New("user_data").ToLocalChecked(),
- Nan::New<v8::External>(data->user_data));
- const int argc = 2;
- v8::Local<v8::Value> argv[argc] = {
- Nan::New(data->service_url).ToLocalChecked(),
- plugin_callback
- };
- Nan::Callback *callback = data->state->callback;
- callback->Call(argc, argv);
- delete data;
- uv_unref((uv_handle_t *)async);
- uv_close((uv_handle_t *)async, (uv_close_cb)free);
+ plugin_state *state = reinterpret_cast<plugin_state*>(async->data);
+ std::list<plugin_callback_data*> callbacks;
+ uv_mutex_lock(&state->plugin_mutex);
+ callbacks.splice(callbacks.begin(), *state->pending_callbacks);
+ uv_mutex_unlock(&state->plugin_mutex);
+ while (!callbacks.empty()) {
+ plugin_callback_data *data = callbacks.front();
+ callbacks.pop_front();
+ // Attach cb and user_data to plugin_callback so that it can access them later
+ v8::Local<v8::Function> plugin_callback = Nan::GetFunction(
+ Nan::New<v8::FunctionTemplate>(PluginCallback)).ToLocalChecked();
+ Nan::Set(plugin_callback, Nan::New("cb").ToLocalChecked(),
+ Nan::New<v8::External>(reinterpret_cast<void*>(data->cb)));
+ Nan::Set(plugin_callback, Nan::New("user_data").ToLocalChecked(),
+ Nan::New<v8::External>(data->user_data));
+ const int argc = 2;
+ v8::Local<v8::Value> argv[argc] = {
+ Nan::New(data->service_url).ToLocalChecked(),
+ plugin_callback
+ };
+ Nan::Callback *callback = state->callback;
+ callback->Call(argc, argv);
+ delete data;
+ }
}
void plugin_get_metadata(void *state, grpc_auth_metadata_context context,
grpc_credentials_plugin_metadata_cb cb,
void *user_data) {
- uv_async_t *async = static_cast<uv_async_t*>(malloc(sizeof(uv_async_t)));
- uv_async_init(uv_default_loop(),
- async,
- SendPluginCallback);
+ plugin_state *p_state = reinterpret_cast<plugin_state*>(state);
plugin_callback_data *data = new plugin_callback_data;
- data->state = reinterpret_cast<plugin_state*>(state);
data->service_url = context.service_url;
data->cb = cb;
data->user_data = user_data;
- async->data = data;
- /* libuv says that it will coalesce calls to uv_async_send. If there is ever a
- * problem with a callback not getting called, that is probably the reason */
- uv_async_send(async);
+
+ uv_mutex_lock(&p_state->plugin_mutex);
+ p_state->pending_callbacks->push_back(data);
+ uv_mutex_unlock(&p_state->plugin_mutex);
+
+ uv_async_send(&p_state->plugin_async);
+}
+
+void plugin_uv_close_cb(uv_handle_t *handle) {
+ uv_async_t *async = reinterpret_cast<uv_async_t*>(handle);
+ plugin_state *state = reinterpret_cast<plugin_state *>(async->data);
+ uv_mutex_destroy(&state->plugin_mutex);
+ delete state->pending_callbacks;
+ delete state->callback;
+ delete state;
}
void plugin_destroy_state(void *ptr) {
plugin_state *state = reinterpret_cast<plugin_state *>(ptr);
- delete state->callback;
+ uv_close((uv_handle_t*)&state->plugin_async, plugin_uv_close_cb);
}
} // namespace node
diff --git a/src/node/ext/call_credentials.h b/src/node/ext/call_credentials.h
index a9bfe30f94..1f35595f3d 100644
--- a/src/node/ext/call_credentials.h
+++ b/src/node/ext/call_credentials.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -34,8 +34,11 @@
#ifndef GRPC_NODE_CALL_CREDENTIALS_H_
#define GRPC_NODE_CALL_CREDENTIALS_H_
+#include <list>
+
#include <node.h>
#include <nan.h>
+#include <uv.h>
#include "grpc/grpc_security.h"
namespace grpc {
@@ -73,17 +76,20 @@ class CallCredentials : public Nan::ObjectWrap {
/* Auth metadata plugin functionality */
-typedef struct plugin_state {
- Nan::Callback *callback;
-} plugin_state;
-
typedef struct plugin_callback_data {
- plugin_state *state;
const char *service_url;
grpc_credentials_plugin_metadata_cb cb;
void *user_data;
} plugin_callback_data;
+typedef struct plugin_state {
+ Nan::Callback *callback;
+ std::list<plugin_callback_data*> *pending_callbacks;
+ uv_mutex_t plugin_mutex;
+ // async.data == this
+ uv_async_t plugin_async;
+} plugin_state;
+
void plugin_get_metadata(void *state, grpc_auth_metadata_context context,
grpc_credentials_plugin_metadata_cb cb,
void *user_data);
diff --git a/src/node/performance/benchmark_client.js b/src/node/performance/benchmark_client.js
index 620aecde97..80bec0b73e 100644
--- a/src/node/performance/benchmark_client.js
+++ b/src/node/performance/benchmark_client.js
@@ -45,6 +45,9 @@ var EventEmitter = require('events');
var _ = require('lodash');
var PoissonProcess = require('poisson-process');
var Histogram = require('./histogram');
+
+var genericService = require('./generic_service');
+
var grpc = require('../../../');
var serviceProto = grpc.load({
root: __dirname + '/../../..',
@@ -104,10 +107,14 @@ function BenchmarkClient(server_targets, channels, histogram_params,
}
this.clients = [];
+ var GenericClient = grpc.makeGenericClientConstructor(genericService);
+ this.genericClients = [];
for (var i = 0; i < channels; i++) {
this.clients[i] = new serviceProto.BenchmarkService(
server_targets[i % server_targets.length], creds, options);
+ this.genericClients[i] = new GenericClient(
+ server_targets[i % server_targets.length], creds, options);
}
this.histogram = new Histogram(histogram_params.resolution,
@@ -130,9 +137,11 @@ util.inherits(BenchmarkClient, EventEmitter);
* 'STREAMING'
* @param {number} req_size The size of the payload to send with each request
* @param {number} resp_size The size of payload to request be sent in responses
+ * @param {boolean} generic Indicates that the generic (non-proto) clients
+ * should be used
*/
BenchmarkClient.prototype.startClosedLoop = function(
- outstanding_rpcs_per_channel, rpc_type, req_size, resp_size) {
+ outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, generic) {
var self = this;
self.running = true;
@@ -141,12 +150,20 @@ BenchmarkClient.prototype.startClosedLoop = function(
var makeCall;
- var argument = {
- response_size: resp_size,
- payload: {
- body: zeroBuffer(req_size)
- }
- };
+ var argument;
+ var client_list;
+ if (generic) {
+ argument = zeroBuffer(req_size);
+ client_list = self.genericClients;
+ } else {
+ argument = {
+ response_size: resp_size,
+ payload: {
+ body: zeroBuffer(req_size)
+ }
+ };
+ client_list = self.clients;
+ }
if (rpc_type == 'UNARY') {
makeCall = function(client) {
@@ -195,7 +212,7 @@ BenchmarkClient.prototype.startClosedLoop = function(
};
}
- _.each(self.clients, function(client) {
+ _.each(client_list, function(client) {
_.times(outstanding_rpcs_per_channel, function() {
makeCall(client);
});
@@ -213,9 +230,12 @@ BenchmarkClient.prototype.startClosedLoop = function(
* @param {number} req_size The size of the payload to send with each request
* @param {number} resp_size The size of payload to request be sent in responses
* @param {number} offered_load The load parameter for the Poisson process
+ * @param {boolean} generic Indicates that the generic (non-proto) clients
+ * should be used
*/
BenchmarkClient.prototype.startPoisson = function(
- outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, offered_load) {
+ outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, offered_load,
+ generic) {
var self = this;
self.running = true;
@@ -224,12 +244,20 @@ BenchmarkClient.prototype.startPoisson = function(
var makeCall;
- var argument = {
- response_size: resp_size,
- payload: {
- body: zeroBuffer(req_size)
- }
- };
+ var argument;
+ var client_list;
+ if (generic) {
+ argument = zeroBuffer(req_size);
+ client_list = self.genericClients;
+ } else {
+ argument = {
+ response_size: resp_size,
+ payload: {
+ body: zeroBuffer(req_size)
+ }
+ };
+ client_list = self.clients;
+ }
if (rpc_type == 'UNARY') {
makeCall = function(client, poisson) {
@@ -282,7 +310,7 @@ BenchmarkClient.prototype.startPoisson = function(
var averageIntervalMs = (1 / offered_load) * 1000;
- _.each(self.clients, function(client) {
+ _.each(client_list, function(client) {
_.times(outstanding_rpcs_per_channel, function() {
var p = PoissonProcess.create(averageIntervalMs, function() {
makeCall(client, p);
diff --git a/src/node/performance/benchmark_server.js b/src/node/performance/benchmark_server.js
index e48acd48f5..b1b0bd12ab 100644
--- a/src/node/performance/benchmark_server.js
+++ b/src/node/performance/benchmark_server.js
@@ -41,6 +41,8 @@
var fs = require('fs');
var path = require('path');
+var genericService = require('./generic_service');
+
var grpc = require('../../../');
var serviceProto = grpc.load({
root: __dirname + '/../../..',
@@ -84,14 +86,28 @@ function streamingCall(call) {
});
}
+function makeStreamingGenericCall(response_size) {
+ var response = zeroBuffer(response_size);
+ return function streamingGenericCall(call) {
+ call.on('data', function(value) {
+ call.write(response);
+ });
+ call.on('end', function() {
+ call.end();
+ });
+ };
+}
+
/**
* BenchmarkServer class. Constructed based on parameters from the driver and
* stores statistics.
* @param {string} host The host to serve on
* @param {number} port The port to listen to
- * @param {tls} Indicates whether TLS should be used
+ * @param {boolean} tls Indicates whether TLS should be used
+ * @param {boolean} generic Indicates whether to use the generic service
+ * @param {number=} response_size The response size for the generic service
*/
-function BenchmarkServer(host, port, tls) {
+function BenchmarkServer(host, port, tls, generic, response_size) {
var server_creds;
var host_override;
if (tls) {
@@ -109,10 +125,16 @@ function BenchmarkServer(host, port, tls) {
var server = new grpc.Server();
this.port = server.bind(host + ':' + port, server_creds);
- server.addProtoService(serviceProto.BenchmarkService.service, {
- unaryCall: unaryCall,
- streamingCall: streamingCall
- });
+ if (generic) {
+ server.addService(genericService, {
+ streamingCall: makeStreamingGenericCall(response_size)
+ });
+ } else {
+ server.addProtoService(serviceProto.BenchmarkService.service, {
+ unaryCall: unaryCall,
+ streamingCall: streamingCall
+ });
+ }
this.server = server;
}
diff --git a/src/node/performance/generic_service.js b/src/node/performance/generic_service.js
new file mode 100644
index 0000000000..ce09cc4336
--- /dev/null
+++ b/src/node/performance/generic_service.js
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+var _ = require('lodash');
+
+module.exports = {
+ 'streamingCall' : {
+ path: '/grpc.testing/BenchmarkService',
+ requestStream: true,
+ responseStream: true,
+ requestSerialize: _.identity,
+ requestDeserialize: _.identity,
+ responseSerialize: _.identity,
+ responseDeserialize: _.identity
+ }
+};
diff --git a/src/node/performance/worker_service_impl.js b/src/node/performance/worker_service_impl.js
index 1439249878..2c4651370f 100644
--- a/src/node/performance/worker_service_impl.js
+++ b/src/node/performance/worker_service_impl.js
@@ -56,18 +56,31 @@ exports.runClient = function runClient(call) {
client.on('error', function(error) {
call.emit('error', error);
});
+ var req_size, resp_size, generic;
+ switch (setup.payload_config.payload) {
+ case 'bytebuf_params':
+ req_size = setup.payload_config.bytebuf_params.req_size;
+ resp_size = setup.payload_config.bytebuf_params.resp_size;
+ generic = true;
+ break;
+ case 'simple_params':
+ req_size = setup.payload_config.simple_params.req_size;
+ resp_size = setup.payload_config.simple_params.resp_size;
+ generic = false;
+ break;
+ default:
+ call.emit('error', new Error('Unsupported PayloadConfig type' +
+ setup.payload_config.payload));
+ }
switch (setup.load_params.load) {
case 'closed_loop':
client.startClosedLoop(setup.outstanding_rpcs_per_channel,
- setup.rpc_type,
- setup.payload_config.simple_params.req_size,
- setup.payload_config.simple_params.resp_size);
+ setup.rpc_type, req_size, resp_size, generic);
break;
case 'poisson':
client.startPoisson(setup.outstanding_rpcs_per_channel,
- setup.rpc_type, setup.payload_config.req_size,
- setup.payload_config.resp_size,
- setup.load_params.poisson.offered_load);
+ setup.rpc_type, req_size, resp_size,
+ setup.load_params.poisson.offered_load, generic);
break;
default:
call.emit('error', new Error('Unsupported LoadParams type' +
diff --git a/src/node/src/credentials.js b/src/node/src/credentials.js
index 1d73723cc0..97c4bd73ac 100644
--- a/src/node/src/credentials.js
+++ b/src/node/src/credentials.js
@@ -118,7 +118,6 @@ exports.createFromMetadataGenerator = function(metadata_generator) {
exports.createFromGoogleCredential = function(google_credential) {
return exports.createFromMetadataGenerator(function(auth_context, callback) {
var service_url = auth_context.service_url;
- console.log('Service URL:', service_url);
google_credential.getRequestMetadata(service_url, function(err, header) {
if (err) {
console.log('Auth error:', err);
@@ -127,7 +126,6 @@ exports.createFromGoogleCredential = function(google_credential) {
}
var metadata = new Metadata();
metadata.add('authorization', header.Authorization);
- console.log(header.Authorization);
callback(null, metadata);
});
});
diff --git a/src/node/src/server.js b/src/node/src/server.js
index 0cf7ba3424..dd0bc12bc9 100644
--- a/src/node/src/server.js
+++ b/src/node/src/server.js
@@ -339,7 +339,7 @@ function _read(size) {
try {
deserialized = self.deserialize(data);
} catch (e) {
- e.code = grpc.status.INVALID_ARGUMENT;
+ e.code = grpc.status.INTERNAL;
self.emit('error', e);
return;
}
@@ -475,7 +475,7 @@ function handleUnary(call, handler, metadata) {
try {
emitter.request = handler.deserialize(result.read);
} catch (e) {
- e.code = grpc.status.INVALID_ARGUMENT;
+ e.code = grpc.status.INTERNAL;
handleError(call, e);
return;
}
@@ -516,7 +516,7 @@ function handleServerStreaming(call, handler, metadata) {
try {
stream.request = handler.deserialize(result.read);
} catch (e) {
- e.code = grpc.status.INVALID_ARGUMENT;
+ e.code = grpc.status.INTERNAL;
stream.emit('error', e);
return;
}
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index 8a232d6fc4..edbfc0a288 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -709,14 +709,14 @@ describe('Other conditions', function() {
it('should respond correctly to a unary call', function(done) {
misbehavingClient.unary(badArg, function(err, data) {
assert(err);
- assert.strictEqual(err.code, grpc.status.INVALID_ARGUMENT);
+ assert.strictEqual(err.code, grpc.status.INTERNAL);
done();
});
});
it('should respond correctly to a client stream', function(done) {
var call = misbehavingClient.clientStream(function(err, data) {
assert(err);
- assert.strictEqual(err.code, grpc.status.INVALID_ARGUMENT);
+ assert.strictEqual(err.code, grpc.status.INTERNAL);
done();
});
call.write(badArg);
@@ -729,7 +729,7 @@ describe('Other conditions', function() {
assert.fail(data, null, 'Unexpected data', '===');
});
call.on('error', function(err) {
- assert.strictEqual(err.code, grpc.status.INVALID_ARGUMENT);
+ assert.strictEqual(err.code, grpc.status.INTERNAL);
done();
});
});
@@ -739,7 +739,7 @@ describe('Other conditions', function() {
assert.fail(data, null, 'Unexpected data', '===');
});
call.on('error', function(err) {
- assert.strictEqual(err.code, grpc.status.INVALID_ARGUMENT);
+ assert.strictEqual(err.code, grpc.status.INTERNAL);
done();
});
call.write(badArg);