From 2b1fd661afc94b871bb200721bf96e6cd5477808 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 28 Apr 2015 07:33:37 -0700 Subject: Hoist iov_size into a grpc_tcp member --- src/core/iomgr/tcp_posix.c | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 86721e9c95..f5f30cac36 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -258,6 +258,7 @@ typedef struct { grpc_endpoint base; grpc_fd *em_fd; int fd; + int iov_size; /* Number of slices to allocate per read attempt */ size_t slice_size; gpr_refcount refcount; @@ -317,7 +318,6 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices, #define MAX_READ_IOVEC 4 static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { grpc_tcp *tcp = (grpc_tcp *)arg; - int iov_size = 1; gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE]; struct msghdr msg; struct iovec iov[MAX_READ_IOVEC]; @@ -340,12 +340,12 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { /* TODO(klempner): Limit the amount we read at once. */ for (;;) { allocated_bytes = slice_state_append_blocks_into_iovec( - &read_state, iov, iov_size, tcp->slice_size); + &read_state, iov, tcp->iov_size, tcp->slice_size); msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; - msg.msg_iovlen = iov_size; + msg.msg_iovlen = tcp->iov_size; msg.msg_control = NULL; msg.msg_controllen = 0; msg.msg_flags = 0; @@ -368,6 +368,9 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { /* NB: After calling the user_cb a parallel call of the read handler may * be running. */ if (errno == EAGAIN) { + if (tcp->iov_size > 1) { + tcp->iov_size /= 2; + } if (slice_state_has_available(&read_state)) { /* TODO(klempner): We should probably do the call into the application without all this junk on the stack */ @@ -402,8 +405,8 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { slice_state_destroy(&read_state); grpc_tcp_unref(tcp); return; - } else if (iov_size < MAX_READ_IOVEC) { - ++iov_size; + } else if (tcp->iov_size < MAX_READ_IOVEC) { + ++tcp->iov_size; } } GRPC_TIMER_MARK(HANDLE_READ_END, 0); @@ -554,6 +557,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { tcp->read_user_data = NULL; tcp->write_user_data = NULL; tcp->slice_size = slice_size; + tcp->iov_size = 1; slice_state_init(&tcp->write_state, NULL, 0, 0); /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); -- cgit v1.2.3 From 89d304a3013652089890d87ea0b3a903a4920947 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 28 Apr 2015 09:20:58 -0700 Subject: Fix test for multiple reads Previously this test assumed all bytes would be read in one request. Also update test to use grpc_init() so that we can enable tracing. --- test/core/iomgr/tcp_posix_test.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 59e525a8e1..40abed5f6e 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -40,6 +40,7 @@ #include #include +#include #include #include #include @@ -140,11 +141,12 @@ static void read_cb(void *user_data, gpr_slice *slices, size_t nslices, grpc_endpoint_cb_status error) { struct read_socket_state *state = (struct read_socket_state *)user_data; ssize_t read_bytes; - int current_data = 0; + int current_data; GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK); gpr_mu_lock(&state->mu); + current_data = state->read_bytes % 256; read_bytes = count_and_unref_slices(slices, nslices, ¤t_data); state->read_bytes += read_bytes; gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes, @@ -483,10 +485,10 @@ static grpc_endpoint_test_config configs[] = { int main(int argc, char **argv) { grpc_test_init(argc, argv); - grpc_iomgr_init(); + grpc_init(); run_tests(); grpc_endpoint_tests(configs[0]); - grpc_iomgr_shutdown(); + grpc_shutdown(); return 0; } -- cgit v1.2.3 From 5c07cce72da4177704df654527781e7fb905cb74 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 28 Apr 2015 09:21:58 -0700 Subject: Don't wait for EAGAIN before reporting reads up --- src/core/iomgr/tcp_posix.c | 154 +++++++++++++++++++++++++-------------------- 1 file changed, 86 insertions(+), 68 deletions(-) diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index f5f30cac36..3825713aba 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -259,6 +259,7 @@ typedef struct { grpc_fd *em_fd; int fd; int iov_size; /* Number of slices to allocate per read attempt */ + int finished_edge; size_t slice_size; gpr_refcount refcount; @@ -316,8 +317,7 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices, #define INLINE_SLICE_BUFFER_SIZE 8 #define MAX_READ_IOVEC 4 -static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { - grpc_tcp *tcp = (grpc_tcp *)arg; +static void grpc_tcp_continue_read(grpc_tcp *tcp) { gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE]; struct msghdr msg; struct iovec iov[MAX_READ_IOVEC]; @@ -327,91 +327,103 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { gpr_slice *final_slices; size_t final_nslices; + GPR_ASSERT(!tcp->finished_edge); GRPC_TIMER_MARK(HANDLE_READ_BEGIN, 0); slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE, 0); - if (!success) { - call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); - grpc_tcp_unref(tcp); - return; + allocated_bytes = slice_state_append_blocks_into_iovec( + &read_state, iov, tcp->iov_size, tcp->slice_size); + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = iov; + msg.msg_iovlen = tcp->iov_size; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = 0; + + GRPC_TIMER_MARK(RECVMSG_BEGIN, 0); + do { + read_bytes = recvmsg(tcp->fd, &msg, 0); + } while (read_bytes < 0 && errno == EINTR); + GRPC_TIMER_MARK(RECVMSG_END, 0); + + if (read_bytes < allocated_bytes) { + /* TODO(klempner): Consider a second read first, in hopes of getting a + * quick EAGAIN and saving a bunch of allocations. */ + slice_state_remove_last(&read_state, read_bytes < 0 + ? allocated_bytes + : allocated_bytes - read_bytes); } - /* TODO(klempner): Limit the amount we read at once. */ - for (;;) { - allocated_bytes = slice_state_append_blocks_into_iovec( - &read_state, iov, tcp->iov_size, tcp->slice_size); - - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = tcp->iov_size; - msg.msg_control = NULL; - msg.msg_controllen = 0; - msg.msg_flags = 0; - - GRPC_TIMER_MARK(RECVMSG_BEGIN, 0); - do { - read_bytes = recvmsg(tcp->fd, &msg, 0); - } while (read_bytes < 0 && errno == EINTR); - GRPC_TIMER_MARK(RECVMSG_END, 0); - - if (read_bytes < allocated_bytes) { - /* TODO(klempner): Consider a second read first, in hopes of getting a - * quick EAGAIN and saving a bunch of allocations. */ - slice_state_remove_last(&read_state, read_bytes < 0 - ? allocated_bytes - : allocated_bytes - read_bytes); - } - - if (read_bytes < 0) { - /* NB: After calling the user_cb a parallel call of the read handler may - * be running. */ - if (errno == EAGAIN) { - if (tcp->iov_size > 1) { - tcp->iov_size /= 2; - } - if (slice_state_has_available(&read_state)) { - /* TODO(klempner): We should probably do the call into the application - without all this junk on the stack */ - /* FIXME(klempner): Refcount properly */ - slice_state_transfer_ownership(&read_state, &final_slices, - &final_nslices); - call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK); - slice_state_destroy(&read_state); - grpc_tcp_unref(tcp); - } else { - /* Spurious read event, consume it here */ - slice_state_destroy(&read_state); - grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); - } - } else { - /* TODO(klempner): Log interesting errors */ - call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR); - slice_state_destroy(&read_state); - grpc_tcp_unref(tcp); + if (read_bytes < 0) { + /* NB: After calling the user_cb a parallel call of the read handler may + * be running. */ + if (errno == EAGAIN) { + if (tcp->iov_size > 1) { + tcp->iov_size /= 2; } - return; - } else if (read_bytes == 0) { - /* 0 read size ==> end of stream */ if (slice_state_has_available(&read_state)) { - /* there were bytes already read: pass them up to the application */ + /* TODO(klempner): We should probably do the call into the application + without all this junk on the stack */ + /* FIXME(klempner): Refcount properly */ slice_state_transfer_ownership(&read_state, &final_slices, &final_nslices); - call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF); + tcp->finished_edge = 1; + call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK); + slice_state_destroy(&read_state); + grpc_tcp_unref(tcp); } else { - call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF); + /* Spurious read event, consume it here */ + slice_state_destroy(&read_state); + grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); } + } else { + /* TODO(klempner): Log interesting errors */ + call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR); slice_state_destroy(&read_state); grpc_tcp_unref(tcp); - return; - } else if (tcp->iov_size < MAX_READ_IOVEC) { + } + } else if (read_bytes == 0) { + /* 0 read size ==> end of stream */ + if (slice_state_has_available(&read_state)) { + /* there were bytes already read: pass them up to the application */ + slice_state_transfer_ownership(&read_state, &final_slices, + &final_nslices); + call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF); + } else { + call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF); + } + slice_state_destroy(&read_state); + grpc_tcp_unref(tcp); + } else { + if (tcp->iov_size < MAX_READ_IOVEC) { ++tcp->iov_size; } + GPR_ASSERT(slice_state_has_available(&read_state)); + slice_state_transfer_ownership(&read_state, &final_slices, + &final_nslices); + call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK); + slice_state_destroy(&read_state); + grpc_tcp_unref(tcp); } + GRPC_TIMER_MARK(HANDLE_READ_END, 0); } +static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { + grpc_tcp *tcp = (grpc_tcp *)arg; + GPR_ASSERT(!tcp->finished_edge); + + if (!success) { + call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); + grpc_tcp_unref(tcp); + } else { + grpc_tcp_continue_read(tcp); + } +} + static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, void *user_data) { grpc_tcp *tcp = (grpc_tcp *)ep; @@ -419,7 +431,12 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, tcp->read_cb = cb; tcp->read_user_data = user_data; gpr_ref(&tcp->refcount); - grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); + if (tcp->finished_edge) { + tcp->finished_edge = 0; + grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); + } else { + grpc_iomgr_add_callback(grpc_tcp_handle_read, tcp); + } } #define MAX_WRITE_IOVEC 16 @@ -558,6 +575,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { tcp->write_user_data = NULL; tcp->slice_size = slice_size; tcp->iov_size = 1; + tcp->finished_edge = 1; slice_state_init(&tcp->write_state, NULL, 0, 0); /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); -- cgit v1.2.3 From 10b9cb5a38480d6acea62e890936914f89b6aa7f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 28 Apr 2015 10:00:15 -0700 Subject: Refine need_more_data We don't need to request more transport data if we: - already have queued messages - are not ready to receive messages --- src/core/surface/call.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 6ca1b4e9a1..f358eb546a 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -421,14 +421,14 @@ static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } static int need_more_data(grpc_call *call) { return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) || - is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) || + (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && grpc_bbq_empty(&call->incoming_queue)) || is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) || is_op_live(call, GRPC_IOREQ_RECV_STATUS) || is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) || (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && grpc_bbq_empty(&call->incoming_queue)) || (call->write_state == WRITE_STATE_INITIAL && !call->is_client && - call->read_state != READ_STATE_STREAM_CLOSED); + call->read_state < READ_STATE_GOT_INITIAL_METADATA); } static void unlock(grpc_call *call) { -- cgit v1.2.3 From 6a8c038d2d569aec1c3dd97484088e415aaab1ad Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 28 Apr 2015 10:01:22 -0700 Subject: Add flow control tracing Allow some insight into what the flow controller is doing --- src/core/surface/init.c | 1 + src/core/transport/chttp2_transport.c | 22 ++++++++++++++++++++++ src/core/transport/chttp2_transport.h | 1 + 3 files changed, 24 insertions(+) diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 5a119a47cc..a5d86d0071 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -59,6 +59,7 @@ void grpc_init(void) { grpc_register_tracer("channel", &grpc_trace_channel); grpc_register_tracer("surface", &grpc_surface_trace); grpc_register_tracer("http", &grpc_http_trace); + grpc_register_tracer("flowctl", &grpc_flowctl_trace); grpc_register_tracer("batch", &grpc_trace_batch); grpc_security_pre_init(); grpc_iomgr_init(); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 26c550c1f1..65161bd1af 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -64,6 +64,7 @@ #define CLIENT_CONNECT_STRLEN 24 int grpc_http_trace = 0; +int grpc_flowctl_trace = 0; typedef struct transport transport; typedef struct stream stream; @@ -74,6 +75,12 @@ typedef struct stream stream; else \ stmt +#define FLOWCTL_TRACE(t, obj, dir, id, delta) \ + if (!grpc_flowctl_trace) \ + ; \ + else \ + flowctl_trace(t, #dir, obj->dir##_window, id, delta) + /* streams are kept in various linked lists depending on what things need to happen to them... this enum labels each list */ typedef enum { @@ -382,6 +389,12 @@ static void add_to_pollset_locked(transport *t, grpc_pollset *pollset); static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op); static void add_metadata_batch(transport *t, stream *s); +static void flowctl_trace(transport *t, const char *flow, gpr_int32 window, + gpr_uint32 id, gpr_int32 delta) { + gpr_log(GPR_DEBUG, "HTTP:FLOW:%p:%d:%s: %d + %d = %d", t, id, flow, window, + delta, window + delta); +} + /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -894,6 +907,8 @@ static int prepare_write(transport *t) { window_delta = grpc_chttp2_preencode( s->outgoing_sopb->ops, &s->outgoing_sopb->nops, GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb); + FLOWCTL_TRACE(t, t, outgoing, 0, -window_delta); + FLOWCTL_TRACE(t, s, outgoing, s->id, -window_delta); t->outgoing_window -= window_delta; s->outgoing_window -= window_delta; @@ -922,6 +937,7 @@ static int prepare_write(transport *t) { if (!s->read_closed && window_delta) { gpr_slice_buffer_add( &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta)); + FLOWCTL_TRACE(t, s, incoming, s->id, window_delta); s->incoming_window += window_delta; } } @@ -931,6 +947,7 @@ static int prepare_write(transport *t) { window_delta = t->connection_window_target - t->incoming_window; gpr_slice_buffer_add(&t->outbuf, grpc_chttp2_window_update_create(0, window_delta)); + FLOWCTL_TRACE(t, t, incoming, 0, window_delta); t->incoming_window += window_delta; } @@ -1253,6 +1270,8 @@ static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) { return GRPC_CHTTP2_CONNECTION_ERROR; } + FLOWCTL_TRACE(t, t, incoming, 0, -t->incoming_frame_size); + FLOWCTL_TRACE(t, s, incoming, s->id, -t->incoming_frame_size); t->incoming_window -= t->incoming_frame_size; s->incoming_window -= t->incoming_frame_size; @@ -1599,6 +1618,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { for (i = 0; i < t->stream_map.count; i++) { stream *s = (stream *)(t->stream_map.values[i]); int was_window_empty = s->outgoing_window <= 0; + FLOWCTL_TRACE(t, s, outgoing, s->id, st.initial_window_update); s->outgoing_window += st.initial_window_update; if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb && s->outgoing_sopb->nops > 0) { @@ -1617,6 +1637,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { GRPC_CHTTP2_FLOW_CONTROL_ERROR), GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1); } else { + FLOWCTL_TRACE(t, s, outgoing, s->id, st.window_update); s->outgoing_window += st.window_update; /* if this window update makes outgoing ops writable again, flag that */ @@ -1631,6 +1652,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { if (!is_window_update_legal(st.window_update, t->outgoing_window)) { drop_connection(t); } else { + FLOWCTL_TRACE(t, t, outgoing, 0, st.window_update); t->outgoing_window += st.window_update; } } diff --git a/src/core/transport/chttp2_transport.h b/src/core/transport/chttp2_transport.h index a7f1b9a864..fad714fabf 100644 --- a/src/core/transport/chttp2_transport.h +++ b/src/core/transport/chttp2_transport.h @@ -38,6 +38,7 @@ #include "src/core/transport/transport.h" extern int grpc_http_trace; +extern int grpc_flowctl_trace; void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, void *arg, -- cgit v1.2.3 From 8119c0a08f977180c225d8447cd5e5337d6cf80f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 28 Apr 2015 17:05:09 -0700 Subject: Capure thread id with default timer implementation --- src/core/profiling/timers.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/core/profiling/timers.c b/src/core/profiling/timers.c index 7cc79bd22b..bd1700ffd8 100644 --- a/src/core/profiling/timers.c +++ b/src/core/profiling/timers.c @@ -40,10 +40,12 @@ #include #include #include +#include #include typedef struct grpc_timer_entry { grpc_precise_clock tm; + gpr_thd_id thd; const char* tag; void* id; const char* file; @@ -85,7 +87,7 @@ static void log_report_locked(grpc_timers_log* log) { grpc_timer_entry* entry = &(log->log[i]); fprintf(fp, "GRPC_LAT_PROF "); grpc_precise_clock_print(&entry->tm, fp); - fprintf(fp, " %s %p %s %d\n", entry->tag, entry->id, entry->file, + fprintf(fp, " %p %s %p %s %d\n", (void*)(gpr_intptr)entry->thd, entry->tag, entry->id, entry->file, entry->line); } @@ -121,6 +123,7 @@ void grpc_timers_log_add(grpc_timers_log* log, const char* tag, void* id, entry->id = id; entry->file = file; entry->line = line; + entry->thd = gpr_thd_currentid(); gpr_mu_unlock(&log->mu); } -- cgit v1.2.3 From 50968495795f3e491a54f6361acb957141004e24 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 28 Apr 2015 17:05:49 -0700 Subject: Add some helpful timing markers --- src/core/surface/call.c | 3 +++ src/core/transport/chttp2_transport.c | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/src/core/surface/call.c b/src/core/surface/call.c index f358eb546a..fc07a19894 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -34,6 +34,7 @@ #include "src/core/surface/call.h" #include "src/core/channel/channel_stack.h" #include "src/core/iomgr/alarm.h" +#include "src/core/profiling/timers.h" #include "src/core/support/string.h" #include "src/core/surface/byte_buffer_queue.h" #include "src/core/surface/channel.h" @@ -701,6 +702,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) { static void call_on_done_recv(void *pc, int success) { grpc_call *call = pc; size_t i; + GRPC_TIMER_MARK(CALL_ON_DONE_RECV_BEGIN, 0); lock(call); call->receiving = 0; if (success) { @@ -741,6 +743,7 @@ static void call_on_done_recv(void *pc, int success) { unlock(call); GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0); + GRPC_TIMER_MARK(CALL_ON_DONE_RECV_END, 0); } static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 65161bd1af..c78d64d2dd 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -37,6 +37,7 @@ #include #include +#include "src/core/profiling/timers.h" #include "src/core/support/string.h" #include "src/core/transport/chttp2/frame_data.h" #include "src/core/transport/chttp2/frame_goaway.h" @@ -783,6 +784,8 @@ static void unlock(transport *t) { grpc_stream_op_buffer nuke_now; const grpc_transport_callbacks *cb = t->cb; + GRPC_TIMER_MARK(HTTP2_UNLOCK_BEGIN, 0); + grpc_sopb_init(&nuke_now); if (t->nuke_later_sopb.nops) { grpc_sopb_swap(&nuke_now, &t->nuke_later_sopb); @@ -831,6 +834,8 @@ static void unlock(transport *t) { /* finally unlock */ gpr_mu_unlock(&t->mu); + GRPC_TIMER_MARK(HTTP2_UNLOCK_CLEANUP, 0); + /* perform some callbacks if necessary */ for (i = 0; i < num_goaways; i++) { cb->goaway(t->cb_user_data, &t->base, goaways[i].status, goaways[i].debug); @@ -861,6 +866,8 @@ static void unlock(transport *t) { grpc_sopb_destroy(&nuke_now); gpr_free(goaways); + + GRPC_TIMER_MARK(HTTP2_UNLOCK_END, 0); } /* -- cgit v1.2.3 From 39d4d72eb6c3b34664ce4d65cfa4f940c78ee536 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 29 Apr 2015 12:52:35 -0700 Subject: Cast away errors on Windows --- src/core/transport/chttp2_transport.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 264e653124..dd7c60710b 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -914,8 +914,8 @@ static int prepare_write(transport *t) { window_delta = grpc_chttp2_preencode( s->outgoing_sopb->ops, &s->outgoing_sopb->nops, GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb); - FLOWCTL_TRACE(t, t, outgoing, 0, -window_delta); - FLOWCTL_TRACE(t, s, outgoing, s->id, -window_delta); + FLOWCTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta); + FLOWCTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta); t->outgoing_window -= window_delta; s->outgoing_window -= window_delta; @@ -1277,8 +1277,8 @@ static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) { return GRPC_CHTTP2_CONNECTION_ERROR; } - FLOWCTL_TRACE(t, t, incoming, 0, -t->incoming_frame_size); - FLOWCTL_TRACE(t, s, incoming, s->id, -t->incoming_frame_size); + FLOWCTL_TRACE(t, t, incoming, 0, -(gpr_int64)t->incoming_frame_size); + FLOWCTL_TRACE(t, s, incoming, s->id, -(gpr_int64)t->incoming_frame_size); t->incoming_window -= t->incoming_frame_size; s->incoming_window -= t->incoming_frame_size; -- cgit v1.2.3 From dba5e0f96be51cce1343a31b5bd3734342642a29 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 29 Apr 2015 12:53:49 -0700 Subject: Tweak comment --- src/core/iomgr/tcp_posix.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 3825713aba..cef5e09760 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -375,7 +375,7 @@ static void grpc_tcp_continue_read(grpc_tcp *tcp) { slice_state_destroy(&read_state); grpc_tcp_unref(tcp); } else { - /* Spurious read event, consume it here */ + /* We've consumed the edge, request a new one */ slice_state_destroy(&read_state); grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); } -- cgit v1.2.3 From e10ebf15ebcec86428c036e47d73328269b72b5e Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 30 Apr 2015 13:12:31 -0700 Subject: Keep code that is common between both async client versions in a new async superclass and only use unary/streaming-specific material in the subclass. --- test/cpp/qps/client_async.cc | 133 ++++++++++++++++--------------------------- 1 file changed, 49 insertions(+), 84 deletions(-) diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 0a6d9beeca..1b039b3dc1 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -130,39 +130,26 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { response_reader_; }; -class AsyncUnaryClient GRPC_FINAL : public Client { +class AsyncClient : public Client { public: - explicit AsyncUnaryClient(const ClientConfig& config) : Client(config) { + explicit AsyncClient(const ClientConfig& config, + void (*setup_ctx)(CompletionQueue*, TestService::Stub*, + const SimpleRequest&)) : + Client(config) { for (int i = 0; i < config.async_client_threads(); i++) { cli_cqs_.emplace_back(new CompletionQueue); } - - auto check_done = [](grpc::Status s, SimpleResponse* response) {}; - int t = 0; for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { for (auto channel = channels_.begin(); channel != channels_.end(); channel++) { auto* cq = cli_cqs_[t].get(); t = (t + 1) % cli_cqs_.size(); - auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, - const SimpleRequest& request, void* tag) { - return stub->AsyncUnaryCall(ctx, request, cq, tag); - }; - - TestService::Stub* stub = channel->get_stub(); - const SimpleRequest& request = request_; - new ClientRpcContextUnaryImpl( - stub, request, start_req, check_done); + (*setup_ctx)(cq, channel->get_stub(), request_); } } - - StartThreads(config.async_client_threads()); } - - ~AsyncUnaryClient() GRPC_OVERRIDE { - EndThreads(); - + virtual ~AsyncClient() { for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { (*cq)->Shutdown(); void* got_tag; @@ -173,10 +160,13 @@ class AsyncUnaryClient GRPC_FINAL : public Client { } } - bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + bool ThreadFunc(Histogram* histogram, size_t thread_idx) + GRPC_OVERRIDE GRPC_FINAL { void* got_tag; bool ok; - switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, std::chrono::system_clock::now() + std::chrono::seconds(1))) { + switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, + std::chrono::system_clock::now() + + std::chrono::seconds(1))) { case CompletionQueue::SHUTDOWN: return false; case CompletionQueue::TIMEOUT: return true; case CompletionQueue::GOT_EVENT: break; @@ -192,10 +182,30 @@ class AsyncUnaryClient GRPC_FINAL : public Client { return true; } - + private: std::vector> cli_cqs_; }; +class AsyncUnaryClient GRPC_FINAL : public AsyncClient { + public: + explicit AsyncUnaryClient(const ClientConfig& config) : + AsyncClient(config, SetupCtx) { + StartThreads(config.async_client_threads()); + } + ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } +private: + static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, + const SimpleRequest& req) { + auto check_done = [](grpc::Status s, SimpleResponse* response) {}; + auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, + const SimpleRequest& request, void* tag) { + return stub->AsyncUnaryCall(ctx, request, cq, tag); + }; + new ClientRpcContextUnaryImpl( + stub, req, start_req, check_done); + } +}; + template class ClientRpcContextStreamingImpl : public ClientRpcContext { public: @@ -241,7 +251,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { return(false); } next_state_ = &ClientRpcContextStreamingImpl::ReadDone; - stream_->Read(&response_, ClientRpcContext::tag(this)); + stream_->Read(&response_, ClientRpcContext::tag(this)); return true; } bool ReadDone(bool ok, Histogram *hist) { @@ -263,71 +273,26 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { stream_; }; -class AsyncStreamingClient GRPC_FINAL : public Client { +class AsyncStreamingClient GRPC_FINAL : public AsyncClient { public: - explicit AsyncStreamingClient(const ClientConfig &config) : Client(config) { - for (int i = 0; i < config.async_client_threads(); i++) { - cli_cqs_.emplace_back(new CompletionQueue); - } - - auto check_done = [](grpc::Status s, SimpleResponse* response) {}; - - int t = 0; - for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { - for (auto channel = channels_.begin(); channel != channels_.end(); - channel++) { - auto* cq = cli_cqs_[t].get(); - t = (t + 1) % cli_cqs_.size(); - auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx, - void *tag) { - auto stream = stub->AsyncStreamingCall(ctx, cq, tag); - return stream; - }; - - TestService::Stub *stub = channel->get_stub(); - const SimpleRequest &request = request_; - new ClientRpcContextStreamingImpl( - stub, request, start_req, check_done); - } - } - + explicit AsyncStreamingClient(const ClientConfig &config) : + AsyncClient(config, SetupCtx) { StartThreads(config.async_client_threads()); } - ~AsyncStreamingClient() GRPC_OVERRIDE { - EndThreads(); - - for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { - (*cq)->Shutdown(); - void *got_tag; - bool ok; - while ((*cq)->Next(&got_tag, &ok)) { - delete ClientRpcContext::detag(got_tag); - } - } - } - - bool ThreadFunc(Histogram *histogram, size_t thread_idx) GRPC_OVERRIDE { - void *got_tag; - bool ok; - switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, std::chrono::system_clock::now() + std::chrono::seconds(1))) { - case CompletionQueue::SHUTDOWN: return false; - case CompletionQueue::TIMEOUT: return true; - case CompletionQueue::GOT_EVENT: break; - } - - ClientRpcContext *ctx = ClientRpcContext::detag(got_tag); - if (ctx->RunNextState(ok, histogram) == false) { - // call the callback and then delete it - ctx->RunNextState(ok, histogram); - ctx->StartNewClone(); - delete ctx; - } - - return true; + ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } +private: + static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, + const SimpleRequest& req) { + auto check_done = [](grpc::Status s, SimpleResponse* response) {}; + auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx, + void *tag) { + auto stream = stub->AsyncStreamingCall(ctx, cq, tag); + return stream; + }; + new ClientRpcContextStreamingImpl( + stub, req, start_req, check_done); } - - std::vector> cli_cqs_; }; std::unique_ptr CreateAsyncUnaryClient(const ClientConfig& args) { -- cgit v1.2.3 From 12b9fc031a38fede8a53e63132522c7166bf589f Mon Sep 17 00:00:00 2001 From: Donna Dionne Date: Fri, 1 May 2015 15:38:40 -0700 Subject: adding php to cloud to prod tests. --- tools/gce_setup/cloud_prod_runner.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/gce_setup/cloud_prod_runner.sh b/tools/gce_setup/cloud_prod_runner.sh index 0389ac9dc1..812be4061c 100755 --- a/tools/gce_setup/cloud_prod_runner.sh +++ b/tools/gce_setup/cloud_prod_runner.sh @@ -38,7 +38,7 @@ main() { source grpc_docker.sh test_cases=(large_unary empty_unary ping_pong client_streaming server_streaming cancel_after_begin cancel_after_first_response) auth_test_cases=(service_account_creds compute_engine_creds jwt_token_creds) - clients=(cxx java go ruby node csharp_mono python) + clients=(cxx java go ruby node csharp_mono python php) for test_case in "${test_cases[@]}" do for client in "${clients[@]}" -- cgit v1.2.3 From 041998d9862654a2b60ae8e3e62831b9371e3c95 Mon Sep 17 00:00:00 2001 From: vjpai Date: Mon, 4 May 2015 09:24:08 -0700 Subject: Change function-pointer to std::function --- test/cpp/qps/client_async.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 1b039b3dc1..0aec1b1a57 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -133,8 +133,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { class AsyncClient : public Client { public: explicit AsyncClient(const ClientConfig& config, - void (*setup_ctx)(CompletionQueue*, TestService::Stub*, - const SimpleRequest&)) : + std::function setup_ctx) : Client(config) { for (int i = 0; i < config.async_client_threads(); i++) { cli_cqs_.emplace_back(new CompletionQueue); @@ -145,7 +145,7 @@ class AsyncClient : public Client { channel++) { auto* cq = cli_cqs_[t].get(); t = (t + 1) % cli_cqs_.size(); - (*setup_ctx)(cq, channel->get_stub(), request_); + setup_ctx(cq, channel->get_stub(), request_); } } } -- cgit v1.2.3 From 27c0bf4d30c167b6fa5c75ca15fb364181d9a9a2 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 4 May 2015 09:31:12 -0700 Subject: Fixed typo --- gRPC.podspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gRPC.podspec b/gRPC.podspec index fe5fe2cc18..418330f5ba 100644 --- a/gRPC.podspec +++ b/gRPC.podspec @@ -4,7 +4,7 @@ Pod::Spec.new do |s| s.summary = 'Generic gRPC client library for iOS' s.homepage = 'https://www.grpc.io' s.license = 'New BSD' - s.authors = { 'Jorge Canizales' => 'jcanizales@google.com' + s.authors = { 'Jorge Canizales' => 'jcanizales@google.com', 'Michael Lumish' => 'mlumish@google.com' } # s.source = { :git => 'https://github.com/grpc/grpc.git', :tag => 'release-0_5_0' } -- cgit v1.2.3 From 5fd67940b869182027b89a2ee8a08c066b74e403 Mon Sep 17 00:00:00 2001 From: vjpai Date: Mon, 4 May 2015 09:45:32 -0700 Subject: Remove gcc 4.5 testing and keep 4.4 --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 1f96c6fc00..c1086c1765 100644 --- a/.travis.yml +++ b/.travis.yml @@ -25,7 +25,6 @@ env: - CONFIG=opt TEST=python - CONFIG=opt TEST=csharp - USE_GCC=4.4 CONFIG=opt TEST=build - - USE_GCC=4.5 CONFIG=opt TEST=build script: - rvm use $RUBY_VERSION - gem install bundler -- cgit v1.2.3 From 4b40559eae53d52dba211bdbc3028bf06b9be098 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 4 May 2015 10:15:26 -0700 Subject: Remove dead file --- src/core/channel/call_op_string.c | 141 -------------------------------------- 1 file changed, 141 deletions(-) delete mode 100644 src/core/channel/call_op_string.c diff --git a/src/core/channel/call_op_string.c b/src/core/channel/call_op_string.c deleted file mode 100644 index 5f7e1be268..0000000000 --- a/src/core/channel/call_op_string.c +++ /dev/null @@ -1,141 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/channel/channel_stack.h" - -#include -#include -#include - -#include "src/core/support/string.h" -#include -#include - -static void put_metadata(gpr_strvec *b, grpc_mdelem *md) { - gpr_strvec_add(b, gpr_strdup(" key=")); - gpr_strvec_add( - b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice), - GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT)); - - gpr_strvec_add(b, gpr_strdup(" value=")); - gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice), - GPR_SLICE_LENGTH(md->value->slice), - GPR_HEXDUMP_PLAINTEXT)); -} - -static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) { - grpc_linked_mdelem *m; - for (m = md.list.head; m != NULL; m = m->next) { - put_metadata(b, m->md); - } - if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) { - char *tmp; - gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec, - md.deadline.tv_nsec); - gpr_strvec_add(b, tmp); - } -} - -char *grpc_call_op_string(grpc_call_op *op) { - char *tmp; - char *out; - - gpr_strvec b; - gpr_strvec_init(&b); - - switch (op->dir) { - case GRPC_CALL_DOWN: - gpr_strvec_add(&b, gpr_strdup(">")); - break; - case GRPC_CALL_UP: - gpr_strvec_add(&b, gpr_strdup("<")); - break; - } - switch (op->type) { - case GRPC_SEND_METADATA: - gpr_strvec_add(&b, gpr_strdup("SEND_METADATA")); - put_metadata_list(&b, op->data.metadata); - break; - case GRPC_SEND_MESSAGE: - gpr_strvec_add(&b, gpr_strdup("SEND_MESSAGE")); - break; - case GRPC_SEND_PREFORMATTED_MESSAGE: - gpr_strvec_add(&b, gpr_strdup("SEND_PREFORMATTED_MESSAGE")); - break; - case GRPC_SEND_FINISH: - gpr_strvec_add(&b, gpr_strdup("SEND_FINISH")); - break; - case GRPC_REQUEST_DATA: - gpr_strvec_add(&b, gpr_strdup("REQUEST_DATA")); - break; - case GRPC_RECV_METADATA: - gpr_strvec_add(&b, gpr_strdup("RECV_METADATA")); - put_metadata_list(&b, op->data.metadata); - break; - case GRPC_RECV_MESSAGE: - gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE")); - break; - case GRPC_RECV_HALF_CLOSE: - gpr_strvec_add(&b, gpr_strdup("RECV_HALF_CLOSE")); - break; - case GRPC_RECV_FINISH: - gpr_strvec_add(&b, gpr_strdup("RECV_FINISH")); - break; - case GRPC_RECV_SYNTHETIC_STATUS: - gpr_asprintf(&tmp, "RECV_SYNTHETIC_STATUS status=%d message='%s'", - op->data.synthetic_status.status, - op->data.synthetic_status.message); - gpr_strvec_add(&b, tmp); - break; - case GRPC_CANCEL_OP: - gpr_strvec_add(&b, gpr_strdup("CANCEL_OP")); - break; - } - gpr_asprintf(&tmp, " flags=0x%08x", op->flags); - gpr_strvec_add(&b, tmp); - if (op->bind_pollset) { - gpr_strvec_add(&b, gpr_strdup("bind_pollset")); - } - - out = gpr_strvec_flatten(&b, NULL); - gpr_strvec_destroy(&b); - - return out; -} - -void grpc_call_log_op(char *file, int line, gpr_log_severity severity, - grpc_call_element *elem, grpc_call_op *op) { - char *str = grpc_call_op_string(op); - gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str); - gpr_free(str); -} -- cgit v1.2.3