From ec8956435dab45acc7e6308ab3854b2711752cb6 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 21 Jan 2015 13:43:39 -0800 Subject: Switched binary stream to object stream --- src/node/client.js | 53 ++++++++++++++++++++++++++--- src/node/server.js | 64 ++++++++++++++++++++++++++++++------ src/node/test/interop_sanity_test.js | 2 +- 3 files changed, 103 insertions(+), 16 deletions(-) (limited to 'src') diff --git a/src/node/client.js b/src/node/client.js index edaa115d0f..fe7e07e29c 100644 --- a/src/node/client.js +++ b/src/node/client.js @@ -45,10 +45,22 @@ util.inherits(GrpcClientStream, Duplex); * from stream.Duplex. * @constructor * @param {grpc.Call} call Call object to proxy - * @param {object} options Stream options + * @param {function(*):Buffer} serialize Serialization function for requests + * @param {function(Buffer):*} deserialize Deserialization function for + * responses */ -function GrpcClientStream(call, options) { - Duplex.call(this, options); +function GrpcClientStream(call, serialize, deserialize) { + Duplex.call(this, {objectMode: true}); + if (!serialize) { + serialize = function(value) { + return value; + }; + } + if (!deserialize) { + deserialize = function(value) { + return value; + }; + } var self = this; // Indicates that we can start reading and have not received a null read var can_read = false; @@ -59,6 +71,32 @@ function GrpcClientStream(call, options) { // Indicates that a write is currently pending var writing = false; this._call = call; + + /** + * Serialize a request value to a buffer. Always maps null to null. Otherwise + * uses the provided serialize function + * @param {*} value The value to serialize + * @return {Buffer} The serialized value + */ + this.serialize = function(value) { + if (value === null || value === undefined) { + return null; + } + return serialize(value); + }; + + /** + * Deserialize a response buffer to a value. Always maps null to null. + * Otherwise uses the provided deserialize function. + * @param {Buffer} buffer The buffer to deserialize + * @return {*} The deserialized value + */ + this.deserialize = function(buffer) { + if (buffer === null) { + return null; + } + return deserialize(buffer); + }; /** * Callback to handle receiving a READ event. Pushes the data from that event * onto the read queue and starts reading again if applicable. @@ -66,7 +104,7 @@ function GrpcClientStream(call, options) { */ function readCallback(event) { var data = event.data; - if (self.push(data)) { + if (self.push(self.deserialize(data))) { if (data == null) { // Disable starting to read after null read was received can_read = false; @@ -102,7 +140,7 @@ function GrpcClientStream(call, options) { next.callback(); writeNext(); }; - call.startWrite(next.chunk, writeCallback, 0); + call.startWrite(self.serialize(next.chunk), writeCallback, 0); } else { writing = false; } @@ -171,6 +209,9 @@ GrpcClientStream.prototype._write = function(chunk, encoding, callback) { * Make a request on the channel to the given method with the given arguments * @param {grpc.Channel} channel The channel on which to make the request * @param {string} method The method to request + * @param {function(*):Buffer} serialize Serialization function for requests + * @param {function(Buffer):*} deserialize Deserialization function for + * responses * @param {array=} metadata Array of metadata key/value pairs to add to the call * @param {(number|Date)=} deadline The deadline for processing this request. * Defaults to infinite future. @@ -178,6 +219,8 @@ GrpcClientStream.prototype._write = function(chunk, encoding, callback) { */ function makeRequest(channel, method, + serialize, + deserialize, metadata, deadline) { if (deadline === undefined) { diff --git a/src/node/server.js b/src/node/server.js index e947032b29..c5f56bc4fc 100644 --- a/src/node/server.js +++ b/src/node/server.js @@ -47,10 +47,21 @@ util.inherits(GrpcServerStream, Duplex); * from stream.Duplex. * @constructor * @param {grpc.Call} call Call object to proxy - * @param {object} options Stream options + * @param {function(*):Buffer} serialize Serialization function for responses + * @param {function(Buffer):*} deserialize Deserialization function for requests */ -function GrpcServerStream(call, options) { - Duplex.call(this, options); +function GrpcServerStream(call, serialize, deserialize) { + Duplex.call(this, {objectMode: true}); + if (!serialize) { + serialize = function(value) { + return value; + }; + } + if (!deserialize) { + deserialize = function(value) { + return value; + }; + } this._call = call; // Indicate that a status has been sent var finished = false; @@ -59,6 +70,33 @@ function GrpcServerStream(call, options) { 'code' : grpc.status.OK, 'details' : 'OK' }; + + /** + * Serialize a response value to a buffer. Always maps null to null. Otherwise + * uses the provided serialize function + * @param {*} value The value to serialize + * @return {Buffer} The serialized value + */ + this.serialize = function(value) { + if (value === null || value === undefined) { + return null; + } + return serialize(value); + }; + + /** + * Deserialize a request buffer to a value. Always maps null to null. + * Otherwise uses the provided deserialize function. + * @param {Buffer} buffer The buffer to deserialize + * @return {*} The deserialized value + */ + this.deserialize = function(buffer) { + if (buffer === null) { + return null; + } + return deserialize(buffer); + }; + /** * Send the pending status */ @@ -75,7 +113,6 @@ function GrpcServerStream(call, options) { * @param {Error} err The error object */ function setStatus(err) { - console.log('Server setting status to', err); var code = grpc.status.INTERNAL; var details = 'Unknown Error'; @@ -113,7 +150,7 @@ function GrpcServerStream(call, options) { return; } var data = event.data; - if (self.push(data) && data != null) { + if (self.push(deserialize(data)) && data != null) { self._call.startRead(readCallback); } else { reading = false; @@ -155,7 +192,7 @@ GrpcServerStream.prototype._read = function(size) { */ GrpcServerStream.prototype._write = function(chunk, encoding, callback) { var self = this; - self._call.startWrite(chunk, function(event) { + self._call.startWrite(self.serialize(chunk), function(event) { callback(); }, 0); }; @@ -211,12 +248,13 @@ function Server(options) { } }, 0); call.serverEndInitialMetadata(0); - var stream = new GrpcServerStream(call); + var stream = new GrpcServerStream(call, handler.serialize, + handler.deserialize); Object.defineProperty(stream, 'cancelled', { get: function() { return cancelled;} }); try { - handler(stream, data.metadata); + handler.func(stream, data.metadata); } catch (e) { stream.emit('error', e); } @@ -237,14 +275,20 @@ function Server(options) { * handle/respond to. * @param {function} handler Function that takes a stream of request values and * returns a stream of response values + * @param {function(*):Buffer} serialize Serialization function for responses + * @param {function(Buffer):*} deserialize Deserialization function for requests * @return {boolean} True if the handler was set. False if a handler was already * set for that name. */ -Server.prototype.register = function(name, handler) { +Server.prototype.register = function(name, handler, serialize, deserialize) { if (this.handlers.hasOwnProperty(name)) { return false; } - this.handlers[name] = handler; + this.handlers[name] = { + func: handler, + serialize: serialize, + deserialize: deserialize + }; return true; }; diff --git a/src/node/test/interop_sanity_test.js b/src/node/test/interop_sanity_test.js index 9959a165ad..7aa95c6aeb 100644 --- a/src/node/test/interop_sanity_test.js +++ b/src/node/test/interop_sanity_test.js @@ -52,7 +52,7 @@ describe('Interop tests', function() { }); }); // This depends on not using a binary stream - it.skip('should pass empty_unary', function(done) { + it('should pass empty_unary', function(done) { interop_client.runTest(port, name_override, 'empty_unary', true, done); }); it('should pass large_unary', function(done) { -- cgit v1.2.3 From cdc2a738c333cc63bd66a546929f41e358af6ab9 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 21 Jan 2015 17:26:04 -0800 Subject: Indicated that serialize and deserialize are optional --- src/node/client.js | 4 ++-- src/node/server.js | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/node/client.js b/src/node/client.js index fe7e07e29c..f913b06f29 100644 --- a/src/node/client.js +++ b/src/node/client.js @@ -45,8 +45,8 @@ util.inherits(GrpcClientStream, Duplex); * from stream.Duplex. * @constructor * @param {grpc.Call} call Call object to proxy - * @param {function(*):Buffer} serialize Serialization function for requests - * @param {function(Buffer):*} deserialize Deserialization function for + * @param {function(*):Buffer=} serialize Serialization function for requests + * @param {function(Buffer):*=} deserialize Deserialization function for * responses */ function GrpcClientStream(call, serialize, deserialize) { diff --git a/src/node/server.js b/src/node/server.js index 27a18f5228..eca20aa5fd 100644 --- a/src/node/server.js +++ b/src/node/server.js @@ -47,8 +47,9 @@ util.inherits(GrpcServerStream, Duplex); * from stream.Duplex. * @constructor * @param {grpc.Call} call Call object to proxy - * @param {function(*):Buffer} serialize Serialization function for responses - * @param {function(Buffer):*} deserialize Deserialization function for requests + * @param {function(*):Buffer=} serialize Serialization function for responses + * @param {function(Buffer):*=} deserialize Deserialization function for + * requests */ function GrpcServerStream(call, serialize, deserialize) { Duplex.call(this, {objectMode: true}); -- cgit v1.2.3