diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-05-22 21:29:35 -0700 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-05-22 21:29:35 -0700 |
commit | 640dfaf2ee0172f64afa9b7ddf9b42f0b56f44ec (patch) | |
tree | 1358813aa44cb82a3c43007d64e7dcf47edc8ae2 | |
parent | 70730b45a19b8518d33c444af64833c15540352e (diff) | |
parent | a152b1cf390cdc024942ba8eb568f8e4f0dae036 (diff) |
Merge github.com:grpc/grpc into you-complete-me
-rw-r--r-- | src/node/interop/interop_client.js | 6 | ||||
-rw-r--r-- | src/node/src/client.js | 4 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 15 |
3 files changed, 15 insertions, 10 deletions
diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index 80f811901c..455055d9f3 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -154,13 +154,15 @@ function serverStreaming(client, done) { arg.response_parameters[resp_index].size); resp_index += 1; }); - call.on('status', function(status) { - assert.strictEqual(status.code, grpc.status.OK); + call.on('end', function() { assert.strictEqual(resp_index, 4); if (done) { done(); } }); + call.on('status', function(status) { + assert.strictEqual(status.code, grpc.status.OK); + }); } /** diff --git a/src/node/src/client.js b/src/node/src/client.js index efec05bbf3..65339406b2 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -125,10 +125,6 @@ function _read(size) { self.finished = true; return; } - if (self.finished) { - self.push(null); - return; - } var data = event.read; if (self.push(self.deserialize(data)) && data !== null) { var read_batch = {}; diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 58499f345f..b9998405f6 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -99,12 +99,15 @@ class AsyncQpsServerTest : public Server { while (srv_cq_->Next(&got_tag, &ok)) { ServerRpcContext *ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke - if (ctx->RunNextState(ok) == false) { + bool still_going = ctx->RunNextState(ok); + std::lock_guard<std::mutex> g(shutdown_mutex_); + if (!shutdown_) { // this RPC context is done, so refresh it - std::lock_guard<std::mutex> g(shutdown_mutex_); - if (!shutdown_) { + if (!still_going) { ctx->Reset(); } + } else { + return; } } return; @@ -116,11 +119,15 @@ class AsyncQpsServerTest : public Server { { std::lock_guard<std::mutex> g(shutdown_mutex_); shutdown_ = true; - srv_cq_->Shutdown(); } for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { thr->join(); } + srv_cq_->Shutdown(); + bool ok; + void *got_tag; + while (srv_cq_->Next(&got_tag, &ok)) + ; while (!contexts_.empty()) { delete contexts_.front(); contexts_.pop_front(); |