diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-01-23 13:43:35 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-01-23 13:43:35 -0800 |
commit | 25290d29905800ce6a5f98761ad3e126ca167ce2 (patch) | |
tree | 8deaa190cf4492e6cd6920e7e68fef8dbb82060b /src/node/client.js | |
parent | 5f5d04146295c5c637fba40fb407abfa3ba7ef34 (diff) | |
parent | ab1103fbb35b5e8faef3d4696fbcd6c504dc93e8 (diff) |
Merge pull request #178 from murgatroid99/merge_new_invoke_api
Merge new invoke api
Diffstat (limited to 'src/node/client.js')
-rw-r--r-- | src/node/client.js | 99 |
1 files changed, 27 insertions, 72 deletions
diff --git a/src/node/client.js b/src/node/client.js index f913b06f29..2fefd14bbc 100644 --- a/src/node/client.js +++ b/src/node/client.js @@ -62,12 +62,9 @@ function GrpcClientStream(call, serialize, deserialize) { }; } var self = this; - // Indicates that we can start reading and have not received a null read - var can_read = false; + var finished = false; // Indicates that a read is currently pending var reading = false; - // Indicates that we can call startWrite - var can_write = false; // Indicates that a write is currently pending var writing = false; this._call = call; @@ -98,91 +95,46 @@ function GrpcClientStream(call, serialize, deserialize) { return deserialize(buffer); }; /** - * Callback to handle receiving a READ event. Pushes the data from that event - * onto the read queue and starts reading again if applicable. - * @param {grpc.Event} event The READ event object + * Callback to be called when a READ event is received. Pushes the data onto + * the read queue and starts reading again if applicable + * @param {grpc.Event} event READ event object */ function readCallback(event) { + if (finished) { + self.push(null); + return; + } var data = event.data; - if (self.push(self.deserialize(data))) { - if (data == null) { - // Disable starting to read after null read was received - can_read = false; - reading = false; - } else { - call.startRead(readCallback); - } + if (self.push(data) && data != null) { + self._call.startRead(readCallback); } else { - // Indicate that reading can be resumed by calling startReading reading = false; } - }; - /** - * Initiate a read, which continues until self.push returns false (indicating - * that reading should be paused) or data is null (indicating that there is no - * more data to read). - */ - function startReading() { - call.startRead(readCallback); - } - // TODO(mlumish): possibly change queue implementation due to shift slowness - var write_queue = []; - /** - * Write the next chunk of data in the write queue if there is one. Otherwise - * indicate that there is no pending write. When the write succeeds, this - * function is called again. - */ - function writeNext() { - if (write_queue.length > 0) { - writing = true; - var next = write_queue.shift(); - var writeCallback = function(event) { - next.callback(); - writeNext(); - }; - call.startWrite(self.serialize(next.chunk), writeCallback, 0); - } else { - writing = false; - } } - call.startInvoke(function(event) { - can_read = true; - can_write = true; - startReading(); - writeNext(); - }, function(event) { + call.invoke(function(event) { self.emit('metadata', event.data); }, function(event) { + finished = true; self.emit('status', event.data); }, 0); this.on('finish', function() { call.writesDone(function() {}); }); /** - * Indicate that reads should start, and start them if the INVOKE_ACCEPTED - * event has been received. + * Start reading if there is not already a pending read. Reading will + * continue until self.push returns false (indicating reads should slow + * down) or the read data is null (indicating that there is no more data). */ - this._enableRead = function() { - if (!reading) { - reading = true; - if (can_read) { - startReading(); + this.startReading = function() { + if (finished) { + self.push(null); + } else { + if (!reading) { + reading = true; + self._call.startRead(readCallback); } } }; - /** - * Push the chunk onto the write queue, and write from the write queue if - * there is not a pending write - * @param {Buffer} chunk The chunk of data to write - * @param {function(Error=)} callback The callback to call when the write - * completes - */ - this._tryWrite = function(chunk, callback) { - write_queue.push({chunk: chunk, callback: callback}); - if (can_write && !writing) { - writeNext(); - } - }; } /** @@ -191,7 +143,7 @@ function GrpcClientStream(call, serialize, deserialize) { * @param {number} size Ignored */ GrpcClientStream.prototype._read = function(size) { - this._enableRead(); + this.startReading(); }; /** @@ -202,7 +154,10 @@ GrpcClientStream.prototype._read = function(size) { * @param {function(Error=)} callback Ignored */ GrpcClientStream.prototype._write = function(chunk, encoding, callback) { - this._tryWrite(chunk, callback); + var self = this; + self._call.startWrite(chunk, function(event) { + callback(); + }, 0); }; /** |