diff options
Diffstat (limited to 'src/node/server.js')
-rw-r--r-- | src/node/server.js | 71 |
1 files changed, 58 insertions, 13 deletions
diff --git a/src/node/server.js b/src/node/server.js index e947032b29..fe50acb5a1 100644 --- a/src/node/server.js +++ b/src/node/server.js @@ -47,10 +47,22 @@ util.inherits(GrpcServerStream, Duplex); * from stream.Duplex. * @constructor * @param {grpc.Call} call Call object to proxy - * @param {object} options Stream options + * @param {function(*):Buffer=} serialize Serialization function for responses + * @param {function(Buffer):*=} deserialize Deserialization function for + * requests */ -function GrpcServerStream(call, options) { - Duplex.call(this, options); +function GrpcServerStream(call, serialize, deserialize) { + Duplex.call(this, {objectMode: true}); + if (!serialize) { + serialize = function(value) { + return value; + }; + } + if (!deserialize) { + deserialize = function(value) { + return value; + }; + } this._call = call; // Indicate that a status has been sent var finished = false; @@ -59,6 +71,33 @@ function GrpcServerStream(call, options) { 'code' : grpc.status.OK, 'details' : 'OK' }; + + /** + * Serialize a response value to a buffer. Always maps null to null. Otherwise + * uses the provided serialize function + * @param {*} value The value to serialize + * @return {Buffer} The serialized value + */ + this.serialize = function(value) { + if (value === null || value === undefined) { + return null; + } + return serialize(value); + }; + + /** + * Deserialize a request buffer to a value. Always maps null to null. + * Otherwise uses the provided deserialize function. + * @param {Buffer} buffer The buffer to deserialize + * @return {*} The deserialized value + */ + this.deserialize = function(buffer) { + if (buffer === null) { + return null; + } + return deserialize(buffer); + }; + /** * Send the pending status */ @@ -75,7 +114,6 @@ function GrpcServerStream(call, options) { * @param {Error} err The error object */ function setStatus(err) { - console.log('Server setting status to', err); var code = grpc.status.INTERNAL; var details = 'Unknown Error'; @@ -113,7 +151,7 @@ function GrpcServerStream(call, options) { return; } var data = event.data; - if (self.push(data) && data != null) { + if (self.push(self.deserialize(data)) && data != null) { self._call.startRead(readCallback); } else { reading = false; @@ -155,7 +193,7 @@ GrpcServerStream.prototype._read = function(size) { */ GrpcServerStream.prototype._write = function(chunk, encoding, callback) { var self = this; - self._call.startWrite(chunk, function(event) { + self._call.startWrite(self.serialize(chunk), function(event) { callback(); }, 0); }; @@ -195,7 +233,7 @@ function Server(options) { function handleNewCall(event) { var call = event.call; var data = event.data; - if (data == null) { + if (data === null) { return; } server.requestCall(handleNewCall); @@ -211,12 +249,13 @@ function Server(options) { } }, 0); call.serverEndInitialMetadata(0); - var stream = new GrpcServerStream(call); + var stream = new GrpcServerStream(call, handler.serialize, + handler.deserialize); Object.defineProperty(stream, 'cancelled', { get: function() { return cancelled;} }); try { - handler(stream, data.metadata); + handler.func(stream, data.metadata); } catch (e) { stream.emit('error', e); } @@ -237,14 +276,20 @@ function Server(options) { * handle/respond to. * @param {function} handler Function that takes a stream of request values and * returns a stream of response values + * @param {function(*):Buffer} serialize Serialization function for responses + * @param {function(Buffer):*} deserialize Deserialization function for requests * @return {boolean} True if the handler was set. False if a handler was already * set for that name. */ -Server.prototype.register = function(name, handler) { +Server.prototype.register = function(name, handler, serialize, deserialize) { if (this.handlers.hasOwnProperty(name)) { return false; } - this.handlers[name] = handler; + this.handlers[name] = { + func: handler, + serialize: serialize, + deserialize: deserialize + }; return true; }; @@ -256,9 +301,9 @@ Server.prototype.register = function(name, handler) { */ Server.prototype.bind = function(port, secure) { if (secure) { - this._server.addSecureHttp2Port(port); + return this._server.addSecureHttp2Port(port); } else { - this._server.addHttp2Port(port); + return this._server.addHttp2Port(port); } }; |