diff options
Diffstat (limited to 'src/node/surface_client.js')
-rw-r--r-- | src/node/surface_client.js | 113 |
1 files changed, 34 insertions, 79 deletions
diff --git a/src/node/surface_client.js b/src/node/surface_client.js index 996e3d101f..b63ae13e8d 100644 --- a/src/node/surface_client.js +++ b/src/node/surface_client.js @@ -63,114 +63,70 @@ util.inherits(ClientReadableObjectStream, Readable); * client side. Extends from stream.Readable. * @constructor * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(Buffer)} deserialize Function for deserializing binary data - * @param {object} options Stream options */ -function ClientReadableObjectStream(stream, deserialize, options) { - options = _.extend(options, {objectMode: true}); +function ClientReadableObjectStream(stream) { + var options = {objectMode: true}; Readable.call(this, options); this._stream = stream; var self = this; forwardEvent(stream, this, 'status'); forwardEvent(stream, this, 'metadata'); this._stream.on('data', function forwardData(chunk) { - if (!self.push(deserialize(chunk))) { + if (!self.push(chunk)) { self._stream.pause(); } }); this._stream.pause(); } -util.inherits(ClientWritableObjectStream, Writable); - /** - * Class for representing a gRPC client streaming call as a Node stream on the - * client side. Extends from stream.Writable. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(*):Buffer} serialize Function for serializing objects - * @param {object} options Stream options + * _read implementation for both types of streams that allow reading. + * @this {ClientReadableObjectStream} + * @param {number} size Ignored */ -function ClientWritableObjectStream(stream, serialize, options) { - options = _.extend(options, {objectMode: true}); - Writable.call(this, options); - this._stream = stream; - this._serialize = serialize; - forwardEvent(stream, this, 'status'); - forwardEvent(stream, this, 'metadata'); - this.on('finish', function() { - this._stream.end(); - }); +function _read(size) { + this._stream.resume(); } +/** + * See docs for _read + */ +ClientReadableObjectStream.prototype._read = _read; -util.inherits(ClientBidiObjectStream, Duplex); +util.inherits(ClientWritableObjectStream, Writable); /** - * Class for representing a gRPC bidi streaming call as a Node stream on the - * client side. Extends from stream.Duplex. + * Class for representing a gRPC client streaming call as a Node stream on the + * client side. Extends from stream.Writable. * @constructor * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(*):Buffer} serialize Function for serializing objects - * @param {function(Buffer)} deserialize Function for deserializing binary data - * @param {object} options Stream options */ -function ClientBidiObjectStream(stream, serialize, deserialize, options) { - options = _.extend(options, {objectMode: true}); - Duplex.call(this, options); +function ClientWritableObjectStream(stream) { + var options = {objectMode: true}; + Writable.call(this, options); this._stream = stream; - this._serialize = serialize; - var self = this; forwardEvent(stream, this, 'status'); forwardEvent(stream, this, 'metadata'); - this._stream.on('data', function forwardData(chunk) { - if (!self.push(deserialize(chunk))) { - self._stream.pause(); - } - }); - this._stream.pause(); this.on('finish', function() { this._stream.end(); }); } /** - * _read implementation for both types of streams that allow reading. - * @this {ClientReadableObjectStream|ClientBidiObjectStream} - * @param {number} size Ignored - */ -function _read(size) { - this._stream.resume(); -} - -/** - * See docs for _read - */ -ClientReadableObjectStream.prototype._read = _read; -/** - * See docs for _read - */ -ClientBidiObjectStream.prototype._read = _read; - -/** * _write implementation for both types of streams that allow writing - * @this {ClientWritableObjectStream|ClientBidiObjectStream} + * @this {ClientWritableObjectStream} * @param {*} chunk The value to write to the stream * @param {string} encoding Ignored * @param {function(Error)} callback Callback to call when finished writing */ function _write(chunk, encoding, callback) { - this._stream.write(this._serialize(chunk), encoding, callback); + this._stream.write(chunk, encoding, callback); } /** * See docs for _write */ ClientWritableObjectStream.prototype._write = _write; -/** - * See docs for _write - */ -ClientBidiObjectStream.prototype._write = _write; /** * Get a function that can make unary requests to the specified method. @@ -196,15 +152,16 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { * @return {EventEmitter} An event emitter for stream related events */ function makeUnaryRequest(argument, callback, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, metadata, deadline); + var stream = client.makeRequest(this.channel, method, serialize, + deserialize, metadata, deadline); var emitter = new EventEmitter(); forwardEvent(stream, emitter, 'status'); forwardEvent(stream, emitter, 'metadata'); - stream.write(serialize(argument)); + stream.write(argument); stream.end(); stream.on('data', function forwardData(chunk) { try { - callback(null, deserialize(chunk)); + callback(null, chunk); } catch (e) { callback(e); } @@ -236,11 +193,12 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { * @return {EventEmitter} An event emitter for stream related events */ function makeClientStreamRequest(callback, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, metadata, deadline); - var obj_stream = new ClientWritableObjectStream(stream, serialize, {}); + var stream = client.makeRequest(this.channel, method, serialize, + deserialize, metadata, deadline); + var obj_stream = new ClientWritableObjectStream(stream); stream.on('data', function forwardData(chunk) { try { - callback(null, deserialize(chunk)); + callback(null, chunk); } catch (e) { callback(e); } @@ -272,9 +230,10 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { * @return {EventEmitter} An event emitter for stream related events */ function makeServerStreamRequest(argument, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, metadata, deadline); - var obj_stream = new ClientReadableObjectStream(stream, deserialize, {}); - stream.write(serialize(argument)); + var stream = client.makeRequest(this.channel, method, serialize, + deserialize, metadata, deadline); + var obj_stream = new ClientReadableObjectStream(stream); + stream.write(argument); stream.end(); return obj_stream; } @@ -301,12 +260,8 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { * @return {EventEmitter} An event emitter for stream related events */ function makeBidiStreamRequest(metadata, deadline) { - var stream = client.makeRequest(this.channel, method, metadata, deadline); - var obj_stream = new ClientBidiObjectStream(stream, - serialize, - deserialize, - {}); - return obj_stream; + return client.makeRequest(this.channel, method, serialize, + deserialize, metadata, deadline); } return makeBidiStreamRequest; } |