aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-04-27 18:23:42 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-04-27 18:23:42 -0700
commit320e6112b5831904c89364a193b52f70f8da4837 (patch)
tree2e7b45817b678410781bda30e91c77b15c612c68
parentea9fbb15b861f71c6fa0c7d78f786c74ce1698fb (diff)
parent17cc30c0b589fd265bede8ba66a0f564166244aa (diff)
Merge branch 'one-pass' of github.com:ctiller/grpc into one-pass
-rw-r--r--include/grpc++/client_context.h5
-rw-r--r--src/core/surface/completion_queue.c2
-rw-r--r--src/core/transport/chttp2_transport.c31
-rw-r--r--src/cpp/client/channel.cc2
-rw-r--r--src/cpp/client/channel.h1
-rw-r--r--test/core/end2end/dualstack_socket_test.c2
-rw-r--r--test/cpp/qps/client_sync.cc1
7 files changed, 36 insertions, 8 deletions
diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h
index 19630c9b54..a58e9872e6 100644
--- a/include/grpc++/client_context.h
+++ b/include/grpc++/client_context.h
@@ -35,6 +35,7 @@
#define GRPCXX_CLIENT_CONTEXT_H
#include <map>
+#include <memory>
#include <string>
#include <grpc/support/log.h>
@@ -126,9 +127,10 @@ class ClientContext {
friend class ::grpc::ClientAsyncResponseReader;
grpc_call* call() { return call_; }
- void set_call(grpc_call* call) {
+ void set_call(grpc_call* call, const std::shared_ptr<ChannelInterface>& channel) {
GPR_ASSERT(call_ == nullptr);
call_ = call;
+ channel_ = channel;
}
grpc_completion_queue* cq() { return cq_; }
@@ -137,6 +139,7 @@ class ClientContext {
grpc::string authority() { return authority_; }
bool initial_metadata_received_;
+ std::shared_ptr<ChannelInterface> channel_;
grpc_call* call_;
grpc_completion_queue* cq_;
gpr_timespec deadline_;
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 24f4a05071..f3c2453b5e 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -401,7 +401,9 @@ static void on_pollset_destroy_done(void *arg) {
}
void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
GPR_ASSERT(cc->queue == NULL);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
}
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 9f9005691b..26c550c1f1 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -1890,13 +1890,22 @@ static void patch_metadata_ops(stream *s) {
size_t j;
size_t mdidx = 0;
size_t last_mdidx;
+ int found_metadata = 0;
+ /* rework the array of metadata into a linked list, making use
+ of the breadcrumbs we left in metadata batches during
+ add_metadata_batch */
for (i = 0; i < nops; i++) {
grpc_stream_op *op = &ops[i];
if (op->type != GRPC_OP_METADATA) continue;
+ found_metadata = 1;
+ /* we left a breadcrumb indicating where the end of this list is,
+ and since we add sequentially, we know from the end of the last
+ segment where this segment begins */
last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail);
GPR_ASSERT(last_mdidx > mdidx);
GPR_ASSERT(last_mdidx <= s->incoming_metadata_count);
+ /* turn the array into a doubly linked list */
op->data.metadata.list.head = &s->incoming_metadata[mdidx];
op->data.metadata.list.tail = &s->incoming_metadata[last_mdidx - 1];
for (j = mdidx + 1; j < last_mdidx; j++) {
@@ -1905,13 +1914,25 @@ static void patch_metadata_ops(stream *s) {
}
s->incoming_metadata[mdidx].prev = NULL;
s->incoming_metadata[last_mdidx-1].next = NULL;
+ /* track where we're up to */
mdidx = last_mdidx;
}
- GPR_ASSERT(mdidx == s->incoming_metadata_count);
- s->old_incoming_metadata = s->incoming_metadata;
- s->incoming_metadata = NULL;
- s->incoming_metadata_count = 0;
- s->incoming_metadata_capacity = 0;
+ if (found_metadata) {
+ s->old_incoming_metadata = s->incoming_metadata;
+ if (mdidx != s->incoming_metadata_count) {
+ /* we have a partially read metadata batch still in incoming_metadata */
+ size_t new_count = s->incoming_metadata_count - mdidx;
+ size_t copy_bytes = sizeof(*s->incoming_metadata) * new_count;
+ GPR_ASSERT(mdidx < s->incoming_metadata_count);
+ s->incoming_metadata = gpr_malloc(copy_bytes);
+ memcpy(s->old_incoming_metadata + mdidx, s->incoming_metadata, copy_bytes);
+ s->incoming_metadata_count = s->incoming_metadata_capacity = new_count;
+ } else {
+ s->incoming_metadata = NULL;
+ s->incoming_metadata_count = 0;
+ s->incoming_metadata_capacity = 0;
+ }
+ }
}
static void finish_reads(transport *t) {
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index ba8882278f..c541ddfb48 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -71,7 +71,7 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
: context->authority().c_str(),
context->raw_deadline());
GRPC_TIMER_MARK(CALL_CREATED, c_call);
- context->set_call(c_call);
+ context->set_call(c_call, shared_from_this());
return Call(c_call, this, cq);
}
diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h
index cd239247c8..46009d20ba 100644
--- a/src/cpp/client/channel.h
+++ b/src/cpp/client/channel.h
@@ -51,6 +51,7 @@ class Credentials;
class StreamContextInterface;
class Channel GRPC_FINAL : public GrpcLibrary,
+ public std::enable_shared_from_this<Channel>,
public ChannelInterface {
public:
Channel(const grpc::string& target, grpc_channel* c_channel);
diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c
index 29097661bc..7b3500233b 100644
--- a/test/core/end2end/dualstack_socket_test.c
+++ b/test/core/end2end/dualstack_socket_test.c
@@ -158,7 +158,7 @@ void test_connect(const char *server_host, const char *client_host, int port,
cq_expect_finished_with_status(v_client, tag(3),
GRPC_STATUS_DEADLINE_EXCEEDED,
"Deadline Exceeded", NULL);
- cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_ERROR);
+ cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);
cq_verify(v_client);
grpc_call_destroy(c);
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index aea5a0fb27..5dd64d0b13 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -105,6 +105,7 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
StartThreads(num_threads_);
}
~SynchronousStreamingClient() {
+ EndThreads();
if (stream_) {
SimpleResponse response;
stream_->WritesDone();