aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-05-22 21:29:35 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-05-22 21:29:35 -0700
commit640dfaf2ee0172f64afa9b7ddf9b42f0b56f44ec (patch)
tree1358813aa44cb82a3c43007d64e7dcf47edc8ae2
parent70730b45a19b8518d33c444af64833c15540352e (diff)
parenta152b1cf390cdc024942ba8eb568f8e4f0dae036 (diff)
Merge github.com:grpc/grpc into you-complete-me
-rw-r--r--src/node/interop/interop_client.js6
-rw-r--r--src/node/src/client.js4
-rw-r--r--test/cpp/qps/server_async.cc15
3 files changed, 15 insertions, 10 deletions
diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js
index 80f811901c..455055d9f3 100644
--- a/src/node/interop/interop_client.js
+++ b/src/node/interop/interop_client.js
@@ -154,13 +154,15 @@ function serverStreaming(client, done) {
arg.response_parameters[resp_index].size);
resp_index += 1;
});
- call.on('status', function(status) {
- assert.strictEqual(status.code, grpc.status.OK);
+ call.on('end', function() {
assert.strictEqual(resp_index, 4);
if (done) {
done();
}
});
+ call.on('status', function(status) {
+ assert.strictEqual(status.code, grpc.status.OK);
+ });
}
/**
diff --git a/src/node/src/client.js b/src/node/src/client.js
index efec05bbf3..65339406b2 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -125,10 +125,6 @@ function _read(size) {
self.finished = true;
return;
}
- if (self.finished) {
- self.push(null);
- return;
- }
var data = event.read;
if (self.push(self.deserialize(data)) && data !== null) {
var read_batch = {};
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 58499f345f..b9998405f6 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -99,12 +99,15 @@ class AsyncQpsServerTest : public Server {
while (srv_cq_->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
- if (ctx->RunNextState(ok) == false) {
+ bool still_going = ctx->RunNextState(ok);
+ std::lock_guard<std::mutex> g(shutdown_mutex_);
+ if (!shutdown_) {
// this RPC context is done, so refresh it
- std::lock_guard<std::mutex> g(shutdown_mutex_);
- if (!shutdown_) {
+ if (!still_going) {
ctx->Reset();
}
+ } else {
+ return;
}
}
return;
@@ -116,11 +119,15 @@ class AsyncQpsServerTest : public Server {
{
std::lock_guard<std::mutex> g(shutdown_mutex_);
shutdown_ = true;
- srv_cq_->Shutdown();
}
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
}
+ srv_cq_->Shutdown();
+ bool ok;
+ void *got_tag;
+ while (srv_cq_->Next(&got_tag, &ok))
+ ;
while (!contexts_.empty()) {
delete contexts_.front();
contexts_.pop_front();