aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-05-04 15:56:05 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-05-04 15:56:05 -0700
commitb7d1608a728acce764bb0cab3230e75ebd83f95f (patch)
tree2890c264ea92967f42cc5efa85945783c13b5f5a
parentcfda2bdd43570e225c6a58aed22d1f1c78ff501a (diff)
parent6dc550287163b7a1ac31808418a8102d794cc238 (diff)
Merge github.com:grpc/grpc into seqno2
-rw-r--r--.travis.yml1
-rw-r--r--gRPC.podspec2
-rw-r--r--src/core/channel/call_op_string.c141
-rw-r--r--src/core/iomgr/tcp_posix.c156
-rw-r--r--src/core/surface/call.c7
-rw-r--r--src/core/surface/init.c1
-rw-r--r--src/core/transport/chttp2_transport.c29
-rw-r--r--src/core/transport/chttp2_transport.h1
-rw-r--r--test/cpp/qps/client_async.cc133
-rwxr-xr-xtools/gce_setup/cloud_prod_runner.sh2
10 files changed, 176 insertions, 297 deletions
diff --git a/.travis.yml b/.travis.yml
index 1f96c6fc00..c1086c1765 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -25,7 +25,6 @@ env:
- CONFIG=opt TEST=python
- CONFIG=opt TEST=csharp
- USE_GCC=4.4 CONFIG=opt TEST=build
- - USE_GCC=4.5 CONFIG=opt TEST=build
script:
- rvm use $RUBY_VERSION
- gem install bundler
diff --git a/gRPC.podspec b/gRPC.podspec
index fe5fe2cc18..418330f5ba 100644
--- a/gRPC.podspec
+++ b/gRPC.podspec
@@ -4,7 +4,7 @@ Pod::Spec.new do |s|
s.summary = 'Generic gRPC client library for iOS'
s.homepage = 'https://www.grpc.io'
s.license = 'New BSD'
- s.authors = { 'Jorge Canizales' => 'jcanizales@google.com'
+ s.authors = { 'Jorge Canizales' => 'jcanizales@google.com',
'Michael Lumish' => 'mlumish@google.com' }
# s.source = { :git => 'https://github.com/grpc/grpc.git', :tag => 'release-0_5_0' }
diff --git a/src/core/channel/call_op_string.c b/src/core/channel/call_op_string.c
deleted file mode 100644
index 5f7e1be268..0000000000
--- a/src/core/channel/call_op_string.c
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/channel/channel_stack.h"
-
-#include <stdarg.h>
-#include <stdio.h>
-#include <string.h>
-
-#include "src/core/support/string.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/useful.h>
-
-static void put_metadata(gpr_strvec *b, grpc_mdelem *md) {
- gpr_strvec_add(b, gpr_strdup(" key="));
- gpr_strvec_add(
- b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice),
- GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT));
-
- gpr_strvec_add(b, gpr_strdup(" value="));
- gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice),
- GPR_SLICE_LENGTH(md->value->slice),
- GPR_HEXDUMP_PLAINTEXT));
-}
-
-static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
- grpc_linked_mdelem *m;
- for (m = md.list.head; m != NULL; m = m->next) {
- put_metadata(b, m->md);
- }
- if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) {
- char *tmp;
- gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec,
- md.deadline.tv_nsec);
- gpr_strvec_add(b, tmp);
- }
-}
-
-char *grpc_call_op_string(grpc_call_op *op) {
- char *tmp;
- char *out;
-
- gpr_strvec b;
- gpr_strvec_init(&b);
-
- switch (op->dir) {
- case GRPC_CALL_DOWN:
- gpr_strvec_add(&b, gpr_strdup(">"));
- break;
- case GRPC_CALL_UP:
- gpr_strvec_add(&b, gpr_strdup("<"));
- break;
- }
- switch (op->type) {
- case GRPC_SEND_METADATA:
- gpr_strvec_add(&b, gpr_strdup("SEND_METADATA"));
- put_metadata_list(&b, op->data.metadata);
- break;
- case GRPC_SEND_MESSAGE:
- gpr_strvec_add(&b, gpr_strdup("SEND_MESSAGE"));
- break;
- case GRPC_SEND_PREFORMATTED_MESSAGE:
- gpr_strvec_add(&b, gpr_strdup("SEND_PREFORMATTED_MESSAGE"));
- break;
- case GRPC_SEND_FINISH:
- gpr_strvec_add(&b, gpr_strdup("SEND_FINISH"));
- break;
- case GRPC_REQUEST_DATA:
- gpr_strvec_add(&b, gpr_strdup("REQUEST_DATA"));
- break;
- case GRPC_RECV_METADATA:
- gpr_strvec_add(&b, gpr_strdup("RECV_METADATA"));
- put_metadata_list(&b, op->data.metadata);
- break;
- case GRPC_RECV_MESSAGE:
- gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE"));
- break;
- case GRPC_RECV_HALF_CLOSE:
- gpr_strvec_add(&b, gpr_strdup("RECV_HALF_CLOSE"));
- break;
- case GRPC_RECV_FINISH:
- gpr_strvec_add(&b, gpr_strdup("RECV_FINISH"));
- break;
- case GRPC_RECV_SYNTHETIC_STATUS:
- gpr_asprintf(&tmp, "RECV_SYNTHETIC_STATUS status=%d message='%s'",
- op->data.synthetic_status.status,
- op->data.synthetic_status.message);
- gpr_strvec_add(&b, tmp);
- break;
- case GRPC_CANCEL_OP:
- gpr_strvec_add(&b, gpr_strdup("CANCEL_OP"));
- break;
- }
- gpr_asprintf(&tmp, " flags=0x%08x", op->flags);
- gpr_strvec_add(&b, tmp);
- if (op->bind_pollset) {
- gpr_strvec_add(&b, gpr_strdup("bind_pollset"));
- }
-
- out = gpr_strvec_flatten(&b, NULL);
- gpr_strvec_destroy(&b);
-
- return out;
-}
-
-void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
- grpc_call_element *elem, grpc_call_op *op) {
- char *str = grpc_call_op_string(op);
- gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str);
- gpr_free(str);
-}
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 40897fb8f8..6b21bcf6a9 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -258,6 +258,8 @@ typedef struct {
grpc_endpoint base;
grpc_fd *em_fd;
int fd;
+ int iov_size; /* Number of slices to allocate per read attempt */
+ int finished_edge;
size_t slice_size;
gpr_refcount refcount;
@@ -315,9 +317,7 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices,
#define INLINE_SLICE_BUFFER_SIZE 8
#define MAX_READ_IOVEC 4
-static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
- grpc_tcp *tcp = (grpc_tcp *)arg;
- int iov_size = 1;
+static void grpc_tcp_continue_read(grpc_tcp *tcp) {
gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE];
struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC];
@@ -327,88 +327,103 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
gpr_slice *final_slices;
size_t final_nslices;
+ GPR_ASSERT(!tcp->finished_edge);
GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0);
slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE,
0);
- if (!success) {
- call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
- grpc_tcp_unref(tcp);
- return;
+ allocated_bytes = slice_state_append_blocks_into_iovec(
+ &read_state, iov, tcp->iov_size, tcp->slice_size);
+
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ msg.msg_iov = iov;
+ msg.msg_iovlen = tcp->iov_size;
+ msg.msg_control = NULL;
+ msg.msg_controllen = 0;
+ msg.msg_flags = 0;
+
+ GRPC_TIMER_MARK(RECVMSG_BEGIN, 0);
+ do {
+ read_bytes = recvmsg(tcp->fd, &msg, 0);
+ } while (read_bytes < 0 && errno == EINTR);
+ GRPC_TIMER_MARK(RECVMSG_END, 0);
+
+ if (read_bytes < allocated_bytes) {
+ /* TODO(klempner): Consider a second read first, in hopes of getting a
+ * quick EAGAIN and saving a bunch of allocations. */
+ slice_state_remove_last(&read_state, read_bytes < 0
+ ? allocated_bytes
+ : allocated_bytes - read_bytes);
}
- /* TODO(klempner): Limit the amount we read at once. */
- for (;;) {
- allocated_bytes = slice_state_append_blocks_into_iovec(
- &read_state, iov, iov_size, tcp->slice_size);
-
- msg.msg_name = NULL;
- msg.msg_namelen = 0;
- msg.msg_iov = iov;
- msg.msg_iovlen = iov_size;
- msg.msg_control = NULL;
- msg.msg_controllen = 0;
- msg.msg_flags = 0;
-
- GRPC_TIMER_BEGIN(GRPC_PTAG_RECVMSG, 0);
- do {
- read_bytes = recvmsg(tcp->fd, &msg, 0);
- } while (read_bytes < 0 && errno == EINTR);
- GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0);
-
- if (read_bytes < allocated_bytes) {
- /* TODO(klempner): Consider a second read first, in hopes of getting a
- * quick EAGAIN and saving a bunch of allocations. */
- slice_state_remove_last(&read_state, read_bytes < 0
- ? allocated_bytes
- : allocated_bytes - read_bytes);
- }
-
- if (read_bytes < 0) {
- /* NB: After calling the user_cb a parallel call of the read handler may
- * be running. */
- if (errno == EAGAIN) {
- if (slice_state_has_available(&read_state)) {
- /* TODO(klempner): We should probably do the call into the application
- without all this junk on the stack */
- /* FIXME(klempner): Refcount properly */
- slice_state_transfer_ownership(&read_state, &final_slices,
- &final_nslices);
- call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
- slice_state_destroy(&read_state);
- grpc_tcp_unref(tcp);
- } else {
- /* Spurious read event, consume it here */
- slice_state_destroy(&read_state);
- grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
- }
- } else {
- /* TODO(klempner): Log interesting errors */
- call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
- slice_state_destroy(&read_state);
- grpc_tcp_unref(tcp);
+ if (read_bytes < 0) {
+ /* NB: After calling the user_cb a parallel call of the read handler may
+ * be running. */
+ if (errno == EAGAIN) {
+ if (tcp->iov_size > 1) {
+ tcp->iov_size /= 2;
}
- return;
- } else if (read_bytes == 0) {
- /* 0 read size ==> end of stream */
if (slice_state_has_available(&read_state)) {
- /* there were bytes already read: pass them up to the application */
+ /* TODO(klempner): We should probably do the call into the application
+ without all this junk on the stack */
+ /* FIXME(klempner): Refcount properly */
slice_state_transfer_ownership(&read_state, &final_slices,
&final_nslices);
- call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF);
+ tcp->finished_edge = 1;
+ call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
+ slice_state_destroy(&read_state);
+ grpc_tcp_unref(tcp);
} else {
- call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF);
+ /* We've consumed the edge, request a new one */
+ slice_state_destroy(&read_state);
+ grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
}
+ } else {
+ /* TODO(klempner): Log interesting errors */
+ call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
slice_state_destroy(&read_state);
grpc_tcp_unref(tcp);
- return;
- } else if (iov_size < MAX_READ_IOVEC) {
- ++iov_size;
}
+ } else if (read_bytes == 0) {
+ /* 0 read size ==> end of stream */
+ if (slice_state_has_available(&read_state)) {
+ /* there were bytes already read: pass them up to the application */
+ slice_state_transfer_ownership(&read_state, &final_slices,
+ &final_nslices);
+ call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF);
+ } else {
+ call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF);
+ }
+ slice_state_destroy(&read_state);
+ grpc_tcp_unref(tcp);
+ } else {
+ if (tcp->iov_size < MAX_READ_IOVEC) {
+ ++tcp->iov_size;
+ }
+ GPR_ASSERT(slice_state_has_available(&read_state));
+ slice_state_transfer_ownership(&read_state, &final_slices,
+ &final_nslices);
+ call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
+ slice_state_destroy(&read_state);
+ grpc_tcp_unref(tcp);
}
+
GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0);
}
+static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
+ grpc_tcp *tcp = (grpc_tcp *)arg;
+ GPR_ASSERT(!tcp->finished_edge);
+
+ if (!success) {
+ call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
+ grpc_tcp_unref(tcp);
+ } else {
+ grpc_tcp_continue_read(tcp);
+ }
+}
+
static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
void *user_data) {
grpc_tcp *tcp = (grpc_tcp *)ep;
@@ -416,7 +431,12 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
tcp->read_cb = cb;
tcp->read_user_data = user_data;
gpr_ref(&tcp->refcount);
- grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
+ if (tcp->finished_edge) {
+ tcp->finished_edge = 0;
+ grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
+ } else {
+ grpc_iomgr_add_callback(grpc_tcp_handle_read, tcp);
+ }
}
#define MAX_WRITE_IOVEC 16
@@ -554,6 +574,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
tcp->read_user_data = NULL;
tcp->write_user_data = NULL;
tcp->slice_size = slice_size;
+ tcp->iov_size = 1;
+ tcp->finished_edge = 1;
slice_state_init(&tcp->write_state, NULL, 0, 0);
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 136921656f..070be1b25a 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -34,6 +34,7 @@
#include "src/core/surface/call.h"
#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/alarm.h"
+#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
#include "src/core/surface/byte_buffer_queue.h"
#include "src/core/surface/channel.h"
@@ -405,14 +406,14 @@ static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
static int need_more_data(grpc_call *call) {
return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
- is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) ||
+ (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && grpc_bbq_empty(&call->incoming_queue)) ||
is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
is_op_live(call, GRPC_IOREQ_RECV_STATUS) ||
is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
(is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
grpc_bbq_empty(&call->incoming_queue)) ||
(call->write_state == WRITE_STATE_INITIAL && !call->is_client &&
- call->read_state != READ_STATE_STREAM_CLOSED);
+ call->read_state < READ_STATE_GOT_INITIAL_METADATA);
}
static void unlock(grpc_call *call) {
@@ -685,6 +686,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
static void call_on_done_recv(void *pc, int success) {
grpc_call *call = pc;
size_t i;
+ GRPC_TIMER_MARK(CALL_ON_DONE_RECV_BEGIN, 0);
lock(call);
call->receiving = 0;
if (success) {
@@ -729,6 +731,7 @@ static void call_on_done_recv(void *pc, int success) {
unlock(call);
GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0);
+ GRPC_TIMER_MARK(CALL_ON_DONE_RECV_END, 0);
}
static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index bfee28e5fc..d6eb9b2c24 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -59,6 +59,7 @@ void grpc_init(void) {
grpc_register_tracer("channel", &grpc_trace_channel);
grpc_register_tracer("surface", &grpc_surface_trace);
grpc_register_tracer("http", &grpc_http_trace);
+ grpc_register_tracer("flowctl", &grpc_flowctl_trace);
grpc_register_tracer("batch", &grpc_trace_batch);
grpc_security_pre_init();
grpc_iomgr_init();
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 6d3a825f2e..e277b36b93 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -37,6 +37,7 @@
#include <stdio.h>
#include <string.h>
+#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
#include "src/core/transport/chttp2/frame_data.h"
#include "src/core/transport/chttp2/frame_goaway.h"
@@ -66,6 +67,7 @@
#define CLIENT_CONNECT_STRLEN 24
int grpc_http_trace = 0;
+int grpc_flowctl_trace = 0;
typedef struct transport transport;
typedef struct stream stream;
@@ -76,6 +78,12 @@ typedef struct stream stream;
else \
stmt
+#define FLOWCTL_TRACE(t, obj, dir, id, delta) \
+ if (!grpc_flowctl_trace) \
+ ; \
+ else \
+ flowctl_trace(t, #dir, obj->dir##_window, id, delta)
+
/* streams are kept in various linked lists depending on what things need to
happen to them... this enum labels each list */
typedef enum {
@@ -384,6 +392,12 @@ static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op);
static void add_metadata_batch(transport *t, stream *s);
+static void flowctl_trace(transport *t, const char *flow, gpr_int32 window,
+ gpr_uint32 id, gpr_int32 delta) {
+ gpr_log(GPR_DEBUG, "HTTP:FLOW:%p:%d:%s: %d + %d = %d", t, id, flow, window,
+ delta, window + delta);
+}
+
/*
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -787,6 +801,8 @@ static void unlock(transport *t) {
grpc_stream_op_buffer nuke_now;
const grpc_transport_callbacks *cb = t->cb;
+ GRPC_TIMER_MARK(HTTP2_UNLOCK_BEGIN, 0);
+
grpc_sopb_init(&nuke_now);
if (t->nuke_later_sopb.nops) {
grpc_sopb_swap(&nuke_now, &t->nuke_later_sopb);
@@ -835,6 +851,8 @@ static void unlock(transport *t) {
/* finally unlock */
gpr_mu_unlock(&t->mu);
+ GRPC_TIMER_MARK(HTTP2_UNLOCK_CLEANUP, 0);
+
/* perform some callbacks if necessary */
for (i = 0; i < num_goaways; i++) {
cb->goaway(t->cb_user_data, &t->base, goaways[i].status, goaways[i].debug);
@@ -865,6 +883,8 @@ static void unlock(transport *t) {
grpc_sopb_destroy(&nuke_now);
gpr_free(goaways);
+
+ GRPC_TIMER_MARK(HTTP2_UNLOCK_END, 0);
}
/*
@@ -911,6 +931,8 @@ static int prepare_write(transport *t) {
window_delta = grpc_chttp2_preencode(
s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
+ FLOWCTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta);
+ FLOWCTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta);
t->outgoing_window -= window_delta;
s->outgoing_window -= window_delta;
@@ -939,6 +961,7 @@ static int prepare_write(transport *t) {
if (!s->read_closed && window_delta) {
gpr_slice_buffer_add(
&t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
+ FLOWCTL_TRACE(t, s, incoming, s->id, window_delta);
s->incoming_window += window_delta;
}
}
@@ -948,6 +971,7 @@ static int prepare_write(transport *t) {
window_delta = t->connection_window_target - t->incoming_window;
gpr_slice_buffer_add(&t->outbuf,
grpc_chttp2_window_update_create(0, window_delta));
+ FLOWCTL_TRACE(t, t, incoming, 0, window_delta);
t->incoming_window += window_delta;
}
@@ -1301,6 +1325,8 @@ static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
return GRPC_CHTTP2_CONNECTION_ERROR;
}
+ FLOWCTL_TRACE(t, t, incoming, 0, -(gpr_int64)t->incoming_frame_size);
+ FLOWCTL_TRACE(t, s, incoming, s->id, -(gpr_int64)t->incoming_frame_size);
t->incoming_window -= t->incoming_frame_size;
s->incoming_window -= t->incoming_frame_size;
@@ -1641,6 +1667,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
for (i = 0; i < t->stream_map.count; i++) {
stream *s = (stream *)(t->stream_map.values[i]);
int was_window_empty = s->outgoing_window <= 0;
+ FLOWCTL_TRACE(t, s, outgoing, s->id, st.initial_window_update);
s->outgoing_window += st.initial_window_update;
if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb &&
s->outgoing_sopb->nops > 0) {
@@ -1659,6 +1686,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
GRPC_CHTTP2_FLOW_CONTROL_ERROR),
GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1);
} else {
+ FLOWCTL_TRACE(t, s, outgoing, s->id, st.window_update);
s->outgoing_window += st.window_update;
/* if this window update makes outgoing ops writable again,
flag that */
@@ -1673,6 +1701,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
if (!is_window_update_legal(st.window_update, t->outgoing_window)) {
drop_connection(t);
} else {
+ FLOWCTL_TRACE(t, t, outgoing, 0, st.window_update);
t->outgoing_window += st.window_update;
}
}
diff --git a/src/core/transport/chttp2_transport.h b/src/core/transport/chttp2_transport.h
index a7f1b9a864..fad714fabf 100644
--- a/src/core/transport/chttp2_transport.h
+++ b/src/core/transport/chttp2_transport.h
@@ -38,6 +38,7 @@
#include "src/core/transport/transport.h"
extern int grpc_http_trace;
+extern int grpc_flowctl_trace;
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
void *arg,
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 0a6d9beeca..0aec1b1a57 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -130,39 +130,26 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
response_reader_;
};
-class AsyncUnaryClient GRPC_FINAL : public Client {
+class AsyncClient : public Client {
public:
- explicit AsyncUnaryClient(const ClientConfig& config) : Client(config) {
+ explicit AsyncClient(const ClientConfig& config,
+ std::function<void(CompletionQueue*, TestService::Stub*,
+ const SimpleRequest&)> setup_ctx) :
+ Client(config) {
for (int i = 0; i < config.async_client_threads(); i++) {
cli_cqs_.emplace_back(new CompletionQueue);
}
-
- auto check_done = [](grpc::Status s, SimpleResponse* response) {};
-
int t = 0;
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
for (auto channel = channels_.begin(); channel != channels_.end();
channel++) {
auto* cq = cli_cqs_[t].get();
t = (t + 1) % cli_cqs_.size();
- auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
- const SimpleRequest& request, void* tag) {
- return stub->AsyncUnaryCall(ctx, request, cq, tag);
- };
-
- TestService::Stub* stub = channel->get_stub();
- const SimpleRequest& request = request_;
- new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
- stub, request, start_req, check_done);
+ setup_ctx(cq, channel->get_stub(), request_);
}
}
-
- StartThreads(config.async_client_threads());
}
-
- ~AsyncUnaryClient() GRPC_OVERRIDE {
- EndThreads();
-
+ virtual ~AsyncClient() {
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
(*cq)->Shutdown();
void* got_tag;
@@ -173,10 +160,13 @@ class AsyncUnaryClient GRPC_FINAL : public Client {
}
}
- bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
+ bool ThreadFunc(Histogram* histogram, size_t thread_idx)
+ GRPC_OVERRIDE GRPC_FINAL {
void* got_tag;
bool ok;
- switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, std::chrono::system_clock::now() + std::chrono::seconds(1))) {
+ switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok,
+ std::chrono::system_clock::now() +
+ std::chrono::seconds(1))) {
case CompletionQueue::SHUTDOWN: return false;
case CompletionQueue::TIMEOUT: return true;
case CompletionQueue::GOT_EVENT: break;
@@ -192,10 +182,30 @@ class AsyncUnaryClient GRPC_FINAL : public Client {
return true;
}
-
+ private:
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
};
+class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
+ public:
+ explicit AsyncUnaryClient(const ClientConfig& config) :
+ AsyncClient(config, SetupCtx) {
+ StartThreads(config.async_client_threads());
+ }
+ ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
+private:
+ static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub,
+ const SimpleRequest& req) {
+ auto check_done = [](grpc::Status s, SimpleResponse* response) {};
+ auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
+ const SimpleRequest& request, void* tag) {
+ return stub->AsyncUnaryCall(ctx, request, cq, tag);
+ };
+ new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
+ stub, req, start_req, check_done);
+ }
+};
+
template <class RequestType, class ResponseType>
class ClientRpcContextStreamingImpl : public ClientRpcContext {
public:
@@ -241,7 +251,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
return(false);
}
next_state_ = &ClientRpcContextStreamingImpl::ReadDone;
- stream_->Read(&response_, ClientRpcContext::tag(this));
+ stream_->Read(&response_, ClientRpcContext::tag(this));
return true;
}
bool ReadDone(bool ok, Histogram *hist) {
@@ -263,71 +273,26 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
stream_;
};
-class AsyncStreamingClient GRPC_FINAL : public Client {
+class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
public:
- explicit AsyncStreamingClient(const ClientConfig &config) : Client(config) {
- for (int i = 0; i < config.async_client_threads(); i++) {
- cli_cqs_.emplace_back(new CompletionQueue);
- }
-
- auto check_done = [](grpc::Status s, SimpleResponse* response) {};
-
- int t = 0;
- for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
- for (auto channel = channels_.begin(); channel != channels_.end();
- channel++) {
- auto* cq = cli_cqs_[t].get();
- t = (t + 1) % cli_cqs_.size();
- auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx,
- void *tag) {
- auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
- return stream;
- };
-
- TestService::Stub *stub = channel->get_stub();
- const SimpleRequest &request = request_;
- new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
- stub, request, start_req, check_done);
- }
- }
-
+ explicit AsyncStreamingClient(const ClientConfig &config) :
+ AsyncClient(config, SetupCtx) {
StartThreads(config.async_client_threads());
}
- ~AsyncStreamingClient() GRPC_OVERRIDE {
- EndThreads();
-
- for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
- (*cq)->Shutdown();
- void *got_tag;
- bool ok;
- while ((*cq)->Next(&got_tag, &ok)) {
- delete ClientRpcContext::detag(got_tag);
- }
- }
- }
-
- bool ThreadFunc(Histogram *histogram, size_t thread_idx) GRPC_OVERRIDE {
- void *got_tag;
- bool ok;
- switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, std::chrono::system_clock::now() + std::chrono::seconds(1))) {
- case CompletionQueue::SHUTDOWN: return false;
- case CompletionQueue::TIMEOUT: return true;
- case CompletionQueue::GOT_EVENT: break;
- }
-
- ClientRpcContext *ctx = ClientRpcContext::detag(got_tag);
- if (ctx->RunNextState(ok, histogram) == false) {
- // call the callback and then delete it
- ctx->RunNextState(ok, histogram);
- ctx->StartNewClone();
- delete ctx;
- }
-
- return true;
+ ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
+private:
+ static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub,
+ const SimpleRequest& req) {
+ auto check_done = [](grpc::Status s, SimpleResponse* response) {};
+ auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx,
+ void *tag) {
+ auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
+ return stream;
+ };
+ new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
+ stub, req, start_req, check_done);
}
-
- std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
};
std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args) {
diff --git a/tools/gce_setup/cloud_prod_runner.sh b/tools/gce_setup/cloud_prod_runner.sh
index 0389ac9dc1..812be4061c 100755
--- a/tools/gce_setup/cloud_prod_runner.sh
+++ b/tools/gce_setup/cloud_prod_runner.sh
@@ -38,7 +38,7 @@ main() {
source grpc_docker.sh
test_cases=(large_unary empty_unary ping_pong client_streaming server_streaming cancel_after_begin cancel_after_first_response)
auth_test_cases=(service_account_creds compute_engine_creds jwt_token_creds)
- clients=(cxx java go ruby node csharp_mono python)
+ clients=(cxx java go ruby node csharp_mono python php)
for test_case in "${test_cases[@]}"
do
for client in "${clients[@]}"