aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Tim Emiola <tbetbetbe@users.noreply.github.com>2015-01-29 14:17:51 -0800
committerGravatar Tim Emiola <tbetbetbe@users.noreply.github.com>2015-01-29 14:17:51 -0800
commit502196669bc6a7bc7d218b7b55a3b55eee728d80 (patch)
tree4247e05234ff378dbe3a4898d9e9f4949ff0ae7e
parentd87c9ea3c9aec16fff5f52dd221d728701c78dcf (diff)
parent4d2d0f0f3acd1e90037524803e7cb6b445f9c676 (diff)
Merge pull request #283 from murgatroid99/node_server_metadata
Add an API for servers to send metadata to clients
-rw-r--r--src/node/interop/interop_server.js2
-rw-r--r--src/node/src/server.js10
-rw-r--r--src/node/src/surface_server.js43
-rw-r--r--src/node/test/client_server_test.js28
-rw-r--r--src/node/test/surface_test.js2
5 files changed, 56 insertions, 29 deletions
diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js
index ebf847876c..54e9715d1e 100644
--- a/src/node/interop/interop_server.js
+++ b/src/node/interop/interop_server.js
@@ -183,7 +183,7 @@ function getServer(port, tls) {
fullDuplexCall: handleFullDuplex,
halfDuplexCall: handleHalfDuplex
}
- }, options);
+ }, null, options);
var port_num = server.bind('0.0.0.0:' + port, tls);
return {server: server, port: port_num};
}
diff --git a/src/node/src/server.js b/src/node/src/server.js
index 03cdbe6f98..a5d737c68d 100644
--- a/src/node/src/server.js
+++ b/src/node/src/server.js
@@ -202,10 +202,13 @@ GrpcServerStream.prototype._write = function(chunk, encoding, callback) {
* Constructs a server object that stores request handlers and delegates
* incoming requests to those handlers
* @constructor
- * @param {Array} options Options that should be passed to the internal server
+ * @param {function(string, Object<string, Array<Buffer>>):
+ Object<string, Array<Buffer|string>>=} getMetadata Callback that gets
+ * metatada for a given method
+ * @param {Object=} options Options that should be passed to the internal server
* implementation
*/
-function Server(options) {
+function Server(getMetadata, options) {
this.handlers = {};
var handlers = this.handlers;
var server = new grpc.Server(options);
@@ -249,6 +252,9 @@ function Server(options) {
stream.emit('cancelled');
}
}, 0);
+ if (getMetadata) {
+ call.addMetadata(getMetadata(data.method, data.metadata));
+ }
call.serverEndInitialMetadata(0);
var stream = new GrpcServerStream(call, handler.serialize,
handler.deserialize);
diff --git a/src/node/src/surface_server.js b/src/node/src/surface_server.js
index af23ec211c..a47d1fa23d 100644
--- a/src/node/src/surface_server.js
+++ b/src/node/src/surface_server.js
@@ -129,16 +129,18 @@ ServerWritableObjectStream.prototype._write = _write;
/**
* Creates a binary stream handler function from a unary handler function
- * @param {function(Object, function(Error, *))} handler Unary call handler
- * @return {function(stream)} Binary stream handler
+ * @param {function(Object, function(Error, *), metadata=)} handler Unary call
+ * handler
+ * @return {function(stream, metadata=)} Binary stream handler
*/
function makeUnaryHandler(handler) {
/**
* Handles a stream by reading a single data value, passing it to the handler,
* and writing the response back to the stream.
* @param {stream} stream Binary data stream
+ * @param {metadata=} metadata Incoming metadata array
*/
- return function handleUnaryCall(stream) {
+ return function handleUnaryCall(stream, metadata) {
stream.on('data', function handleUnaryData(value) {
var call = {request: value};
Object.defineProperty(call, 'cancelled', {
@@ -154,7 +156,7 @@ function makeUnaryHandler(handler) {
stream.write(value);
stream.end();
}
- });
+ }, metadata);
});
};
}
@@ -162,17 +164,18 @@ function makeUnaryHandler(handler) {
/**
* Creates a binary stream handler function from a client stream handler
* function
- * @param {function(Readable, function(Error, *))} handler Client stream call
- * handler
- * @return {function(stream)} Binary stream handler
+ * @param {function(Readable, function(Error, *), metadata=)} handler Client
+ * stream call handler
+ * @return {function(stream, metadata=)} Binary stream handler
*/
function makeClientStreamHandler(handler) {
/**
* Handles a stream by passing a deserializing stream to the handler and
* writing the response back to the stream.
* @param {stream} stream Binary data stream
+ * @param {metadata=} metadata Incoming metadata array
*/
- return function handleClientStreamCall(stream) {
+ return function handleClientStreamCall(stream, metadata) {
var object_stream = new ServerReadableObjectStream(stream);
handler(object_stream, function sendClientStreamData(err, value) {
if (err) {
@@ -181,35 +184,36 @@ function makeClientStreamHandler(handler) {
stream.write(value);
stream.end();
}
- });
+ }, metadata);
};
}
/**
* Creates a binary stream handler function from a server stream handler
* function
- * @param {function(Writable)} handler Server stream call handler
- * @return {function(stream)} Binary stream handler
+ * @param {function(Writable, metadata=)} handler Server stream call handler
+ * @return {function(stream, metadata=)} Binary stream handler
*/
function makeServerStreamHandler(handler) {
/**
* Handles a stream by attaching it to a serializing stream, and passing it to
* the handler.
* @param {stream} stream Binary data stream
+ * @param {metadata=} metadata Incoming metadata array
*/
- return function handleServerStreamCall(stream) {
+ return function handleServerStreamCall(stream, metadata) {
stream.on('data', function handleClientData(value) {
var object_stream = new ServerWritableObjectStream(stream);
object_stream.request = value;
- handler(object_stream);
+ handler(object_stream, metadata);
});
};
}
/**
* Creates a binary stream handler function from a bidi stream handler function
- * @param {function(Duplex)} handler Unary call handler
- * @return {function(stream)} Binary stream handler
+ * @param {function(Duplex, metadata=)} handler Unary call handler
+ * @return {function(stream, metadata=)} Binary stream handler
*/
function makeBidiStreamHandler(handler) {
return handler;
@@ -252,10 +256,13 @@ function makeServerConstructor(services) {
* @constructor
* @param {Object} service_handlers Map from service names to map from method
* names to handlers
- * @param {Object} options Options to pass to the underlying server
+ * @param {function(string, Object<string, Array<Buffer>>):
+ Object<string, Array<Buffer|string>>=} getMetadata Callback that
+ * gets metatada for a given method
+ * @param {Object=} options Options to pass to the underlying server
*/
- function SurfaceServer(service_handlers, options) {
- var server = new Server(options);
+ function SurfaceServer(service_handlers, getMetadata, options) {
+ var server = new Server(getMetadata, options);
this.inner_server = server;
_.each(services, function(service) {
var service_name = common.fullyQualifiedName(service);
diff --git a/src/node/test/client_server_test.js b/src/node/test/client_server_test.js
index 9e1b2a79c8..059dd1323a 100644
--- a/src/node/test/client_server_test.js
+++ b/src/node/test/client_server_test.js
@@ -84,6 +84,10 @@ function cancelHandler(stream) {
// do nothing
}
+function metadataHandler(stream, metadata) {
+ stream.end();
+}
+
/**
* Serialize a string to a Buffer
* @param {string} value The string to serialize
@@ -106,11 +110,14 @@ describe('echo client', function() {
var server;
var channel;
before(function() {
- server = new Server();
+ server = new Server(function getMetadata(method, metadata) {
+ return {method: [method]};
+ });
var port_num = server.bind('0.0.0.0:0');
server.register('echo', echoHandler);
server.register('error', errorHandler);
server.register('cancellation', cancelHandler);
+ server.register('metadata', metadataHandler);
server.start();
channel = new grpc.Channel('localhost:' + port_num);
@@ -142,12 +149,19 @@ describe('echo client', function() {
done();
});
});
+ it('should recieve metadata set by the server', function(done) {
+ var stream = client.makeRequest(channel, 'metadata');
+ stream.on('metadata', function(metadata) {
+ assert.strictEqual(metadata.method[0].toString(), 'metadata');
+ });
+ stream.on('status', function(status) {
+ assert.equal(status.code, client.status.OK);
+ done();
+ });
+ stream.end();
+ });
it('should get an error status that the server throws', function(done) {
- var stream = client.makeRequest(
- channel,
- 'error',
- null,
- getDeadline(1));
+ var stream = client.makeRequest(channel, 'error');
stream.on('data', function() {});
stream.write(new Buffer('test'));
@@ -189,7 +203,7 @@ describe('secure echo client', function() {
key_data,
pem_data);
- server = new Server({'credentials' : server_creds});
+ server = new Server(null, {'credentials' : server_creds});
var port_num = server.bind('0.0.0.0:0', true);
server.register('echo', echoHandler);
server.start();
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index 85f4841d4b..1038f9ab33 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -75,7 +75,7 @@ describe('Surface server constructor', function() {
}, /math.Math/);
});
});
-describe('Surface client', function() {
+describe('Cancelling surface client', function() {
var client;
var server;
before(function() {