diff options
author | 2015-01-29 14:17:51 -0800 | |
---|---|---|
committer | 2015-01-29 14:17:51 -0800 | |
commit | 502196669bc6a7bc7d218b7b55a3b55eee728d80 (patch) | |
tree | 4247e05234ff378dbe3a4898d9e9f4949ff0ae7e | |
parent | d87c9ea3c9aec16fff5f52dd221d728701c78dcf (diff) | |
parent | 4d2d0f0f3acd1e90037524803e7cb6b445f9c676 (diff) |
Merge pull request #283 from murgatroid99/node_server_metadata
Add an API for servers to send metadata to clients
-rw-r--r-- | src/node/interop/interop_server.js | 2 | ||||
-rw-r--r-- | src/node/src/server.js | 10 | ||||
-rw-r--r-- | src/node/src/surface_server.js | 43 | ||||
-rw-r--r-- | src/node/test/client_server_test.js | 28 | ||||
-rw-r--r-- | src/node/test/surface_test.js | 2 |
5 files changed, 56 insertions, 29 deletions
diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js index ebf847876c..54e9715d1e 100644 --- a/src/node/interop/interop_server.js +++ b/src/node/interop/interop_server.js @@ -183,7 +183,7 @@ function getServer(port, tls) { fullDuplexCall: handleFullDuplex, halfDuplexCall: handleHalfDuplex } - }, options); + }, null, options); var port_num = server.bind('0.0.0.0:' + port, tls); return {server: server, port: port_num}; } diff --git a/src/node/src/server.js b/src/node/src/server.js index 03cdbe6f98..a5d737c68d 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -202,10 +202,13 @@ GrpcServerStream.prototype._write = function(chunk, encoding, callback) { * Constructs a server object that stores request handlers and delegates * incoming requests to those handlers * @constructor - * @param {Array} options Options that should be passed to the internal server + * @param {function(string, Object<string, Array<Buffer>>): + Object<string, Array<Buffer|string>>=} getMetadata Callback that gets + * metatada for a given method + * @param {Object=} options Options that should be passed to the internal server * implementation */ -function Server(options) { +function Server(getMetadata, options) { this.handlers = {}; var handlers = this.handlers; var server = new grpc.Server(options); @@ -249,6 +252,9 @@ function Server(options) { stream.emit('cancelled'); } }, 0); + if (getMetadata) { + call.addMetadata(getMetadata(data.method, data.metadata)); + } call.serverEndInitialMetadata(0); var stream = new GrpcServerStream(call, handler.serialize, handler.deserialize); diff --git a/src/node/src/surface_server.js b/src/node/src/surface_server.js index af23ec211c..a47d1fa23d 100644 --- a/src/node/src/surface_server.js +++ b/src/node/src/surface_server.js @@ -129,16 +129,18 @@ ServerWritableObjectStream.prototype._write = _write; /** * Creates a binary stream handler function from a unary handler function - * @param {function(Object, function(Error, *))} handler Unary call handler - * @return {function(stream)} Binary stream handler + * @param {function(Object, function(Error, *), metadata=)} handler Unary call + * handler + * @return {function(stream, metadata=)} Binary stream handler */ function makeUnaryHandler(handler) { /** * Handles a stream by reading a single data value, passing it to the handler, * and writing the response back to the stream. * @param {stream} stream Binary data stream + * @param {metadata=} metadata Incoming metadata array */ - return function handleUnaryCall(stream) { + return function handleUnaryCall(stream, metadata) { stream.on('data', function handleUnaryData(value) { var call = {request: value}; Object.defineProperty(call, 'cancelled', { @@ -154,7 +156,7 @@ function makeUnaryHandler(handler) { stream.write(value); stream.end(); } - }); + }, metadata); }); }; } @@ -162,17 +164,18 @@ function makeUnaryHandler(handler) { /** * Creates a binary stream handler function from a client stream handler * function - * @param {function(Readable, function(Error, *))} handler Client stream call - * handler - * @return {function(stream)} Binary stream handler + * @param {function(Readable, function(Error, *), metadata=)} handler Client + * stream call handler + * @return {function(stream, metadata=)} Binary stream handler */ function makeClientStreamHandler(handler) { /** * Handles a stream by passing a deserializing stream to the handler and * writing the response back to the stream. * @param {stream} stream Binary data stream + * @param {metadata=} metadata Incoming metadata array */ - return function handleClientStreamCall(stream) { + return function handleClientStreamCall(stream, metadata) { var object_stream = new ServerReadableObjectStream(stream); handler(object_stream, function sendClientStreamData(err, value) { if (err) { @@ -181,35 +184,36 @@ function makeClientStreamHandler(handler) { stream.write(value); stream.end(); } - }); + }, metadata); }; } /** * Creates a binary stream handler function from a server stream handler * function - * @param {function(Writable)} handler Server stream call handler - * @return {function(stream)} Binary stream handler + * @param {function(Writable, metadata=)} handler Server stream call handler + * @return {function(stream, metadata=)} Binary stream handler */ function makeServerStreamHandler(handler) { /** * Handles a stream by attaching it to a serializing stream, and passing it to * the handler. * @param {stream} stream Binary data stream + * @param {metadata=} metadata Incoming metadata array */ - return function handleServerStreamCall(stream) { + return function handleServerStreamCall(stream, metadata) { stream.on('data', function handleClientData(value) { var object_stream = new ServerWritableObjectStream(stream); object_stream.request = value; - handler(object_stream); + handler(object_stream, metadata); }); }; } /** * Creates a binary stream handler function from a bidi stream handler function - * @param {function(Duplex)} handler Unary call handler - * @return {function(stream)} Binary stream handler + * @param {function(Duplex, metadata=)} handler Unary call handler + * @return {function(stream, metadata=)} Binary stream handler */ function makeBidiStreamHandler(handler) { return handler; @@ -252,10 +256,13 @@ function makeServerConstructor(services) { * @constructor * @param {Object} service_handlers Map from service names to map from method * names to handlers - * @param {Object} options Options to pass to the underlying server + * @param {function(string, Object<string, Array<Buffer>>): + Object<string, Array<Buffer|string>>=} getMetadata Callback that + * gets metatada for a given method + * @param {Object=} options Options to pass to the underlying server */ - function SurfaceServer(service_handlers, options) { - var server = new Server(options); + function SurfaceServer(service_handlers, getMetadata, options) { + var server = new Server(getMetadata, options); this.inner_server = server; _.each(services, function(service) { var service_name = common.fullyQualifiedName(service); diff --git a/src/node/test/client_server_test.js b/src/node/test/client_server_test.js index 9e1b2a79c8..059dd1323a 100644 --- a/src/node/test/client_server_test.js +++ b/src/node/test/client_server_test.js @@ -84,6 +84,10 @@ function cancelHandler(stream) { // do nothing } +function metadataHandler(stream, metadata) { + stream.end(); +} + /** * Serialize a string to a Buffer * @param {string} value The string to serialize @@ -106,11 +110,14 @@ describe('echo client', function() { var server; var channel; before(function() { - server = new Server(); + server = new Server(function getMetadata(method, metadata) { + return {method: [method]}; + }); var port_num = server.bind('0.0.0.0:0'); server.register('echo', echoHandler); server.register('error', errorHandler); server.register('cancellation', cancelHandler); + server.register('metadata', metadataHandler); server.start(); channel = new grpc.Channel('localhost:' + port_num); @@ -142,12 +149,19 @@ describe('echo client', function() { done(); }); }); + it('should recieve metadata set by the server', function(done) { + var stream = client.makeRequest(channel, 'metadata'); + stream.on('metadata', function(metadata) { + assert.strictEqual(metadata.method[0].toString(), 'metadata'); + }); + stream.on('status', function(status) { + assert.equal(status.code, client.status.OK); + done(); + }); + stream.end(); + }); it('should get an error status that the server throws', function(done) { - var stream = client.makeRequest( - channel, - 'error', - null, - getDeadline(1)); + var stream = client.makeRequest(channel, 'error'); stream.on('data', function() {}); stream.write(new Buffer('test')); @@ -189,7 +203,7 @@ describe('secure echo client', function() { key_data, pem_data); - server = new Server({'credentials' : server_creds}); + server = new Server(null, {'credentials' : server_creds}); var port_num = server.bind('0.0.0.0:0', true); server.register('echo', echoHandler); server.start(); diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 85f4841d4b..1038f9ab33 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -75,7 +75,7 @@ describe('Surface server constructor', function() { }, /math.Math/); }); }); -describe('Surface client', function() { +describe('Cancelling surface client', function() { var client; var server; before(function() { |