aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Tim Emiola <tbetbetbe@users.noreply.github.com>2015-04-03 13:17:04 -0700
committerGravatar Tim Emiola <tbetbetbe@users.noreply.github.com>2015-04-03 13:17:04 -0700
commit300ebc4c623b46f45506ac14b31e2660196dc596 (patch)
tree4101d7db2961df13e7b9ce32576133f587d8e992
parentef38504ddd2292fd61d7444b4285e7203858a9c3 (diff)
parent49388b31a0a363dd82a49f593d1b100e77a09aa9 (diff)
Merge pull request #1140 from murgatroid99/node_trailing_metadata
Node trailing metadata
-rw-r--r--src/node/src/client.js4
-rw-r--r--src/node/src/server.js43
-rw-r--r--src/node/test/surface_test.js161
-rw-r--r--src/node/test/test_service.proto52
4 files changed, 250 insertions, 10 deletions
diff --git a/src/node/src/client.js b/src/node/src/client.js
index c46f7d0526..fad369c2f8 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -241,13 +241,13 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
callback(err);
return;
}
+ emitter.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
error.code = response.status.code;
callback(error);
return;
}
- emitter.emit('status', response.status);
emitter.emit('metadata', response.metadata);
callback(null, deserialize(response.read));
});
@@ -312,13 +312,13 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
callback(err);
return;
}
+ stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
error.code = response.status.code;
callback(error);
return;
}
- stream.emit('status', response.status);
callback(null, deserialize(response.read));
});
});
diff --git a/src/node/src/server.js b/src/node/src/server.js
index 8a26a43606..05de16294d 100644
--- a/src/node/src/server.js
+++ b/src/node/src/server.js
@@ -70,6 +70,9 @@ function handleError(call, error) {
status.details = error.details;
}
}
+ if (error.hasOwnProperty('metadata')) {
+ status.metadata = error.metadata;
+ }
var error_batch = {};
error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(error_batch, function(){});
@@ -102,15 +105,20 @@ function waitForCancel(call, emitter) {
* @param {*} value The value to respond with
* @param {function(*):Buffer=} serialize Serialization function for the
* response
+ * @param {Object=} metadata Optional trailing metadata to send with status
*/
-function sendUnaryResponse(call, value, serialize) {
+function sendUnaryResponse(call, value, serialize, metadata) {
var end_batch = {};
- end_batch[grpc.opType.SEND_MESSAGE] = serialize(value);
- end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ var status = {
code: grpc.status.OK,
details: 'OK',
metadata: {}
};
+ if (metadata) {
+ status.metadata = metadata;
+ }
+ end_batch[grpc.opType.SEND_MESSAGE] = serialize(value);
+ end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(end_batch, function (){});
}
@@ -143,6 +151,7 @@ function setUpWritable(stream, serialize) {
function setStatus(err) {
var code = grpc.status.INTERNAL;
var details = 'Unknown Error';
+ var metadata = {};
if (err.hasOwnProperty('message')) {
details = err.message;
}
@@ -152,7 +161,10 @@ function setUpWritable(stream, serialize) {
details = err.details;
}
}
- stream.status = {code: code, details: details, metadata: {}};
+ if (err.hasOwnProperty('metadata')) {
+ metadata = err.metadata;
+ }
+ stream.status = {code: code, details: details, metadata: metadata};
}
/**
* Terminate the call. This includes indicating that reads are done, draining
@@ -166,6 +178,17 @@ function setUpWritable(stream, serialize) {
stream.end();
}
stream.on('error', terminateCall);
+ /**
+ * Override of Writable#end method that allows for sending metadata with a
+ * success status.
+ * @param {Object=} metadata Metadata to send with the status
+ */
+ stream.end = function(metadata) {
+ if (metadata) {
+ stream.status.metadata = metadata;
+ }
+ Writable.prototype.end.call(this);
+ };
}
/**
@@ -335,11 +358,13 @@ function handleUnary(call, handler, metadata) {
if (emitter.cancelled) {
return;
}
- handler.func(emitter, function sendUnaryData(err, value) {
+ handler.func(emitter, function sendUnaryData(err, value, trailer) {
if (err) {
+ err.metadata = trailer;
handleError(call, err);
+ } else {
+ sendUnaryResponse(call, value, handler.serialize, trailer);
}
- sendUnaryResponse(call, value, handler.serialize);
});
});
}
@@ -378,12 +403,14 @@ function handleClientStreaming(call, handler, metadata) {
var metadata_batch = {};
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
call.startBatch(metadata_batch, function() {});
- handler.func(stream, function(err, value) {
+ handler.func(stream, function(err, value, trailer) {
stream.terminate();
if (err) {
+ err.metadata = trailer;
handleError(call, err);
+ } else {
+ sendUnaryResponse(call, value, handler.serialize, trailer);
}
- sendUnaryResponse(call, value, handler.serialize);
});
}
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index 96b47815e1..590c644c71 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -126,6 +126,167 @@ describe('Generic client and server', function() {
});
});
});
+describe('Trailing metadata', function() {
+ var client;
+ var server;
+ before(function() {
+ var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
+ var test_service = test_proto.lookup('TestService');
+ var Server = grpc.buildServer([test_service]);
+ server = new Server({
+ TestService: {
+ unary: function(call, cb) {
+ var req = call.request;
+ if (req.error) {
+ cb(new Error('Requested error'), null, {metadata: ['yes']});
+ } else {
+ cb(null, {count: 1}, {metadata: ['yes']});
+ }
+ },
+ clientStream: function(stream, cb){
+ var count = 0;
+ var errored;
+ stream.on('data', function(data) {
+ if (data.error) {
+ errored = true;
+ cb(new Error('Requested error'), null, {metadata: ['yes']});
+ } else {
+ count += 1;
+ }
+ });
+ stream.on('end', function() {
+ if (!errored) {
+ cb(null, {count: count}, {metadata: ['yes']});
+ }
+ });
+ },
+ serverStream: function(stream) {
+ var req = stream.request;
+ if (req.error) {
+ var err = new Error('Requested error');
+ err.metadata = {metadata: ['yes']};
+ stream.emit('error', err);
+ } else {
+ for (var i = 0; i < 5; i++) {
+ stream.write({count: i});
+ }
+ stream.end({metadata: ['yes']});
+ }
+ },
+ bidiStream: function(stream) {
+ var count = 0;
+ stream.on('data', function(data) {
+ if (data.error) {
+ var err = new Error('Requested error');
+ err.metadata = {
+ metadata: ['yes'],
+ count: ['' + count]
+ };
+ stream.emit('error', err);
+ } else {
+ stream.write({count: count});
+ count += 1;
+ }
+ });
+ stream.on('end', function() {
+ stream.end({metadata: ['yes']});
+ });
+ }
+ }
+ });
+ var port = server.bind('localhost:0');
+ var Client = surface_client.makeProtobufClientConstructor(test_service);
+ client = new Client('localhost:' + port);
+ server.listen();
+ });
+ after(function() {
+ server.shutdown();
+ });
+ it('should be present when a unary call succeeds', function(done) {
+ var call = client.unary({error: false}, function(err, data) {
+ assert.ifError(err);
+ });
+ call.on('status', function(status) {
+ assert.deepEqual(status.metadata.metadata, ['yes']);
+ done();
+ });
+ });
+ it('should be present when a unary call fails', function(done) {
+ var call = client.unary({error: true}, function(err, data) {
+ assert(err);
+ });
+ call.on('status', function(status) {
+ assert.deepEqual(status.metadata.metadata, ['yes']);
+ done();
+ });
+ });
+ it('should be present when a client stream call succeeds', function(done) {
+ var call = client.clientStream(function(err, data) {
+ assert.ifError(err);
+ });
+ call.write({error: false});
+ call.write({error: false});
+ call.end();
+ call.on('status', function(status) {
+ assert.deepEqual(status.metadata.metadata, ['yes']);
+ done();
+ });
+ });
+ it('should be present when a client stream call fails', function(done) {
+ var call = client.clientStream(function(err, data) {
+ assert(err);
+ });
+ call.write({error: false});
+ call.write({error: true});
+ call.end();
+ call.on('status', function(status) {
+ assert.deepEqual(status.metadata.metadata, ['yes']);
+ done();
+ });
+ });
+ it('should be present when a server stream call succeeds', function(done) {
+ var call = client.serverStream({error: false});
+ call.on('data', function(){});
+ call.on('status', function(status) {
+ assert.strictEqual(status.code, grpc.status.OK);
+ assert.deepEqual(status.metadata.metadata, ['yes']);
+ done();
+ });
+ });
+ it('should be present when a server stream call fails', function(done) {
+ var call = client.serverStream({error: true});
+ call.on('data', function(){});
+ call.on('status', function(status) {
+ assert.notStrictEqual(status.code, grpc.status.OK);
+ assert.deepEqual(status.metadata.metadata, ['yes']);
+ done();
+ });
+ });
+ it('should be present when a bidi stream succeeds', function(done) {
+ var call = client.bidiStream();
+ call.write({error: false});
+ call.write({error: false});
+ call.end();
+ call.on('data', function(){});
+ call.on('status', function(status) {
+ assert.strictEqual(status.code, grpc.status.OK);
+ assert.deepEqual(status.metadata.metadata, ['yes']);
+ done();
+ });
+ });
+ it('should be present when a bidi stream fails', function(done) {
+ var call = client.bidiStream();
+ call.write({error: false});
+ call.write({error: true});
+ call.end();
+ call.on('data', function(){});
+ call.on('status', function(status) {
+ assert.notStrictEqual(status.code, grpc.status.OK);
+ assert.deepEqual(status.metadata.metadata, ['yes']);
+ done();
+ });
+ });
+});
describe('Cancelling surface client', function() {
var client;
var server;
diff --git a/src/node/test/test_service.proto b/src/node/test/test_service.proto
new file mode 100644
index 0000000000..5d3d891841
--- /dev/null
+++ b/src/node/test/test_service.proto
@@ -0,0 +1,52 @@
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto2";
+
+message Request {
+ optional bool error = 1;
+}
+
+message Response {
+ optional int32 count = 1;
+}
+
+service TestService {
+ rpc Unary (Request) returns (Response) {
+ }
+
+ rpc ClientStream (stream Request) returns (Response) {
+ }
+
+ rpc ServerStream (Request) returns (stream Response) {
+ }
+
+ rpc BidiStream (stream Request) returns (stream Response) {
+ }
+} \ No newline at end of file