diff options
Diffstat (limited to 'src/node/client.js')
-rw-r--r-- | src/node/client.js | 53 |
1 files changed, 48 insertions, 5 deletions
diff --git a/src/node/client.js b/src/node/client.js index edaa115d0f..f913b06f29 100644 --- a/src/node/client.js +++ b/src/node/client.js @@ -45,10 +45,22 @@ util.inherits(GrpcClientStream, Duplex); * from stream.Duplex. * @constructor * @param {grpc.Call} call Call object to proxy - * @param {object} options Stream options + * @param {function(*):Buffer=} serialize Serialization function for requests + * @param {function(Buffer):*=} deserialize Deserialization function for + * responses */ -function GrpcClientStream(call, options) { - Duplex.call(this, options); +function GrpcClientStream(call, serialize, deserialize) { + Duplex.call(this, {objectMode: true}); + if (!serialize) { + serialize = function(value) { + return value; + }; + } + if (!deserialize) { + deserialize = function(value) { + return value; + }; + } var self = this; // Indicates that we can start reading and have not received a null read var can_read = false; @@ -59,6 +71,32 @@ function GrpcClientStream(call, options) { // Indicates that a write is currently pending var writing = false; this._call = call; + + /** + * Serialize a request value to a buffer. Always maps null to null. Otherwise + * uses the provided serialize function + * @param {*} value The value to serialize + * @return {Buffer} The serialized value + */ + this.serialize = function(value) { + if (value === null || value === undefined) { + return null; + } + return serialize(value); + }; + + /** + * Deserialize a response buffer to a value. Always maps null to null. + * Otherwise uses the provided deserialize function. + * @param {Buffer} buffer The buffer to deserialize + * @return {*} The deserialized value + */ + this.deserialize = function(buffer) { + if (buffer === null) { + return null; + } + return deserialize(buffer); + }; /** * Callback to handle receiving a READ event. Pushes the data from that event * onto the read queue and starts reading again if applicable. @@ -66,7 +104,7 @@ function GrpcClientStream(call, options) { */ function readCallback(event) { var data = event.data; - if (self.push(data)) { + if (self.push(self.deserialize(data))) { if (data == null) { // Disable starting to read after null read was received can_read = false; @@ -102,7 +140,7 @@ function GrpcClientStream(call, options) { next.callback(); writeNext(); }; - call.startWrite(next.chunk, writeCallback, 0); + call.startWrite(self.serialize(next.chunk), writeCallback, 0); } else { writing = false; } @@ -171,6 +209,9 @@ GrpcClientStream.prototype._write = function(chunk, encoding, callback) { * Make a request on the channel to the given method with the given arguments * @param {grpc.Channel} channel The channel on which to make the request * @param {string} method The method to request + * @param {function(*):Buffer} serialize Serialization function for requests + * @param {function(Buffer):*} deserialize Deserialization function for + * responses * @param {array=} metadata Array of metadata key/value pairs to add to the call * @param {(number|Date)=} deadline The deadline for processing this request. * Defaults to infinite future. @@ -178,6 +219,8 @@ GrpcClientStream.prototype._write = function(chunk, encoding, callback) { */ function makeRequest(channel, method, + serialize, + deserialize, metadata, deadline) { if (deadline === undefined) { |