diff options
author | murgatroid99 <michael.lumish@gmail.com> | 2015-01-21 13:43:39 -0800 |
---|---|---|
committer | murgatroid99 <michael.lumish@gmail.com> | 2015-01-21 13:43:39 -0800 |
commit | ec8956435dab45acc7e6308ab3854b2711752cb6 (patch) | |
tree | b9cc8e39a5fc775d7635d33a51d3abbe79a14b62 /src/node/server.js | |
parent | eeccd21fb9c00718876d87bde70fbb09d1bd7ab4 (diff) |
Switched binary stream to object stream
Diffstat (limited to 'src/node/server.js')
-rw-r--r-- | src/node/server.js | 64 |
1 files changed, 54 insertions, 10 deletions
diff --git a/src/node/server.js b/src/node/server.js index e947032b29..c5f56bc4fc 100644 --- a/src/node/server.js +++ b/src/node/server.js @@ -47,10 +47,21 @@ util.inherits(GrpcServerStream, Duplex); * from stream.Duplex. * @constructor * @param {grpc.Call} call Call object to proxy - * @param {object} options Stream options + * @param {function(*):Buffer} serialize Serialization function for responses + * @param {function(Buffer):*} deserialize Deserialization function for requests */ -function GrpcServerStream(call, options) { - Duplex.call(this, options); +function GrpcServerStream(call, serialize, deserialize) { + Duplex.call(this, {objectMode: true}); + if (!serialize) { + serialize = function(value) { + return value; + }; + } + if (!deserialize) { + deserialize = function(value) { + return value; + }; + } this._call = call; // Indicate that a status has been sent var finished = false; @@ -59,6 +70,33 @@ function GrpcServerStream(call, options) { 'code' : grpc.status.OK, 'details' : 'OK' }; + + /** + * Serialize a response value to a buffer. Always maps null to null. Otherwise + * uses the provided serialize function + * @param {*} value The value to serialize + * @return {Buffer} The serialized value + */ + this.serialize = function(value) { + if (value === null || value === undefined) { + return null; + } + return serialize(value); + }; + + /** + * Deserialize a request buffer to a value. Always maps null to null. + * Otherwise uses the provided deserialize function. + * @param {Buffer} buffer The buffer to deserialize + * @return {*} The deserialized value + */ + this.deserialize = function(buffer) { + if (buffer === null) { + return null; + } + return deserialize(buffer); + }; + /** * Send the pending status */ @@ -75,7 +113,6 @@ function GrpcServerStream(call, options) { * @param {Error} err The error object */ function setStatus(err) { - console.log('Server setting status to', err); var code = grpc.status.INTERNAL; var details = 'Unknown Error'; @@ -113,7 +150,7 @@ function GrpcServerStream(call, options) { return; } var data = event.data; - if (self.push(data) && data != null) { + if (self.push(deserialize(data)) && data != null) { self._call.startRead(readCallback); } else { reading = false; @@ -155,7 +192,7 @@ GrpcServerStream.prototype._read = function(size) { */ GrpcServerStream.prototype._write = function(chunk, encoding, callback) { var self = this; - self._call.startWrite(chunk, function(event) { + self._call.startWrite(self.serialize(chunk), function(event) { callback(); }, 0); }; @@ -211,12 +248,13 @@ function Server(options) { } }, 0); call.serverEndInitialMetadata(0); - var stream = new GrpcServerStream(call); + var stream = new GrpcServerStream(call, handler.serialize, + handler.deserialize); Object.defineProperty(stream, 'cancelled', { get: function() { return cancelled;} }); try { - handler(stream, data.metadata); + handler.func(stream, data.metadata); } catch (e) { stream.emit('error', e); } @@ -237,14 +275,20 @@ function Server(options) { * handle/respond to. * @param {function} handler Function that takes a stream of request values and * returns a stream of response values + * @param {function(*):Buffer} serialize Serialization function for responses + * @param {function(Buffer):*} deserialize Deserialization function for requests * @return {boolean} True if the handler was set. False if a handler was already * set for that name. */ -Server.prototype.register = function(name, handler) { +Server.prototype.register = function(name, handler, serialize, deserialize) { if (this.handlers.hasOwnProperty(name)) { return false; } - this.handlers[name] = handler; + this.handlers[name] = { + func: handler, + serialize: serialize, + deserialize: deserialize + }; return true; }; |