diff options
author | Tim Emiola <tbetbetbe@users.noreply.github.com> | 2015-04-03 13:17:04 -0700 |
---|---|---|
committer | Tim Emiola <tbetbetbe@users.noreply.github.com> | 2015-04-03 13:17:04 -0700 |
commit | 300ebc4c623b46f45506ac14b31e2660196dc596 (patch) | |
tree | 4101d7db2961df13e7b9ce32576133f587d8e992 | |
parent | ef38504ddd2292fd61d7444b4285e7203858a9c3 (diff) | |
parent | 49388b31a0a363dd82a49f593d1b100e77a09aa9 (diff) |
Merge pull request #1140 from murgatroid99/node_trailing_metadata
Node trailing metadata
-rw-r--r-- | src/node/src/client.js | 4 | ||||
-rw-r--r-- | src/node/src/server.js | 43 | ||||
-rw-r--r-- | src/node/test/surface_test.js | 161 | ||||
-rw-r--r-- | src/node/test/test_service.proto | 52 |
4 files changed, 250 insertions, 10 deletions
diff --git a/src/node/src/client.js b/src/node/src/client.js index c46f7d0526..fad369c2f8 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -241,13 +241,13 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { callback(err); return; } + emitter.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); error.code = response.status.code; callback(error); return; } - emitter.emit('status', response.status); emitter.emit('metadata', response.metadata); callback(null, deserialize(response.read)); }); @@ -312,13 +312,13 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { callback(err); return; } + stream.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); error.code = response.status.code; callback(error); return; } - stream.emit('status', response.status); callback(null, deserialize(response.read)); }); }); diff --git a/src/node/src/server.js b/src/node/src/server.js index 8a26a43606..05de16294d 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -70,6 +70,9 @@ function handleError(call, error) { status.details = error.details; } } + if (error.hasOwnProperty('metadata')) { + status.metadata = error.metadata; + } var error_batch = {}; error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; call.startBatch(error_batch, function(){}); @@ -102,15 +105,20 @@ function waitForCancel(call, emitter) { * @param {*} value The value to respond with * @param {function(*):Buffer=} serialize Serialization function for the * response + * @param {Object=} metadata Optional trailing metadata to send with status */ -function sendUnaryResponse(call, value, serialize) { +function sendUnaryResponse(call, value, serialize, metadata) { var end_batch = {}; - end_batch[grpc.opType.SEND_MESSAGE] = serialize(value); - end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + var status = { code: grpc.status.OK, details: 'OK', metadata: {} }; + if (metadata) { + status.metadata = metadata; + } + end_batch[grpc.opType.SEND_MESSAGE] = serialize(value); + end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; call.startBatch(end_batch, function (){}); } @@ -143,6 +151,7 @@ function setUpWritable(stream, serialize) { function setStatus(err) { var code = grpc.status.INTERNAL; var details = 'Unknown Error'; + var metadata = {}; if (err.hasOwnProperty('message')) { details = err.message; } @@ -152,7 +161,10 @@ function setUpWritable(stream, serialize) { details = err.details; } } - stream.status = {code: code, details: details, metadata: {}}; + if (err.hasOwnProperty('metadata')) { + metadata = err.metadata; + } + stream.status = {code: code, details: details, metadata: metadata}; } /** * Terminate the call. This includes indicating that reads are done, draining @@ -166,6 +178,17 @@ function setUpWritable(stream, serialize) { stream.end(); } stream.on('error', terminateCall); + /** + * Override of Writable#end method that allows for sending metadata with a + * success status. + * @param {Object=} metadata Metadata to send with the status + */ + stream.end = function(metadata) { + if (metadata) { + stream.status.metadata = metadata; + } + Writable.prototype.end.call(this); + }; } /** @@ -335,11 +358,13 @@ function handleUnary(call, handler, metadata) { if (emitter.cancelled) { return; } - handler.func(emitter, function sendUnaryData(err, value) { + handler.func(emitter, function sendUnaryData(err, value, trailer) { if (err) { + err.metadata = trailer; handleError(call, err); + } else { + sendUnaryResponse(call, value, handler.serialize, trailer); } - sendUnaryResponse(call, value, handler.serialize); }); }); } @@ -378,12 +403,14 @@ function handleClientStreaming(call, handler, metadata) { var metadata_batch = {}; metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; call.startBatch(metadata_batch, function() {}); - handler.func(stream, function(err, value) { + handler.func(stream, function(err, value, trailer) { stream.terminate(); if (err) { + err.metadata = trailer; handleError(call, err); + } else { + sendUnaryResponse(call, value, handler.serialize, trailer); } - sendUnaryResponse(call, value, handler.serialize); }); } diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 96b47815e1..590c644c71 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -126,6 +126,167 @@ describe('Generic client and server', function() { }); }); }); +describe('Trailing metadata', function() { + var client; + var server; + before(function() { + var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); + var test_service = test_proto.lookup('TestService'); + var Server = grpc.buildServer([test_service]); + server = new Server({ + TestService: { + unary: function(call, cb) { + var req = call.request; + if (req.error) { + cb(new Error('Requested error'), null, {metadata: ['yes']}); + } else { + cb(null, {count: 1}, {metadata: ['yes']}); + } + }, + clientStream: function(stream, cb){ + var count = 0; + var errored; + stream.on('data', function(data) { + if (data.error) { + errored = true; + cb(new Error('Requested error'), null, {metadata: ['yes']}); + } else { + count += 1; + } + }); + stream.on('end', function() { + if (!errored) { + cb(null, {count: count}, {metadata: ['yes']}); + } + }); + }, + serverStream: function(stream) { + var req = stream.request; + if (req.error) { + var err = new Error('Requested error'); + err.metadata = {metadata: ['yes']}; + stream.emit('error', err); + } else { + for (var i = 0; i < 5; i++) { + stream.write({count: i}); + } + stream.end({metadata: ['yes']}); + } + }, + bidiStream: function(stream) { + var count = 0; + stream.on('data', function(data) { + if (data.error) { + var err = new Error('Requested error'); + err.metadata = { + metadata: ['yes'], + count: ['' + count] + }; + stream.emit('error', err); + } else { + stream.write({count: count}); + count += 1; + } + }); + stream.on('end', function() { + stream.end({metadata: ['yes']}); + }); + } + } + }); + var port = server.bind('localhost:0'); + var Client = surface_client.makeProtobufClientConstructor(test_service); + client = new Client('localhost:' + port); + server.listen(); + }); + after(function() { + server.shutdown(); + }); + it('should be present when a unary call succeeds', function(done) { + var call = client.unary({error: false}, function(err, data) { + assert.ifError(err); + }); + call.on('status', function(status) { + assert.deepEqual(status.metadata.metadata, ['yes']); + done(); + }); + }); + it('should be present when a unary call fails', function(done) { + var call = client.unary({error: true}, function(err, data) { + assert(err); + }); + call.on('status', function(status) { + assert.deepEqual(status.metadata.metadata, ['yes']); + done(); + }); + }); + it('should be present when a client stream call succeeds', function(done) { + var call = client.clientStream(function(err, data) { + assert.ifError(err); + }); + call.write({error: false}); + call.write({error: false}); + call.end(); + call.on('status', function(status) { + assert.deepEqual(status.metadata.metadata, ['yes']); + done(); + }); + }); + it('should be present when a client stream call fails', function(done) { + var call = client.clientStream(function(err, data) { + assert(err); + }); + call.write({error: false}); + call.write({error: true}); + call.end(); + call.on('status', function(status) { + assert.deepEqual(status.metadata.metadata, ['yes']); + done(); + }); + }); + it('should be present when a server stream call succeeds', function(done) { + var call = client.serverStream({error: false}); + call.on('data', function(){}); + call.on('status', function(status) { + assert.strictEqual(status.code, grpc.status.OK); + assert.deepEqual(status.metadata.metadata, ['yes']); + done(); + }); + }); + it('should be present when a server stream call fails', function(done) { + var call = client.serverStream({error: true}); + call.on('data', function(){}); + call.on('status', function(status) { + assert.notStrictEqual(status.code, grpc.status.OK); + assert.deepEqual(status.metadata.metadata, ['yes']); + done(); + }); + }); + it('should be present when a bidi stream succeeds', function(done) { + var call = client.bidiStream(); + call.write({error: false}); + call.write({error: false}); + call.end(); + call.on('data', function(){}); + call.on('status', function(status) { + assert.strictEqual(status.code, grpc.status.OK); + assert.deepEqual(status.metadata.metadata, ['yes']); + done(); + }); + }); + it('should be present when a bidi stream fails', function(done) { + var call = client.bidiStream(); + call.write({error: false}); + call.write({error: true}); + call.end(); + call.on('data', function(){}); + call.on('status', function(status) { + assert.notStrictEqual(status.code, grpc.status.OK); + assert.deepEqual(status.metadata.metadata, ['yes']); + done(); + }); + }); +}); describe('Cancelling surface client', function() { var client; var server; diff --git a/src/node/test/test_service.proto b/src/node/test/test_service.proto new file mode 100644 index 0000000000..5d3d891841 --- /dev/null +++ b/src/node/test/test_service.proto @@ -0,0 +1,52 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto2"; + +message Request { + optional bool error = 1; +} + +message Response { + optional int32 count = 1; +} + +service TestService { + rpc Unary (Request) returns (Response) { + } + + rpc ClientStream (stream Request) returns (Response) { + } + + rpc ServerStream (Request) returns (stream Response) { + } + + rpc BidiStream (stream Request) returns (stream Response) { + } +}
\ No newline at end of file |