diff options
author | Julien Boeuf <jboeuf@google.com> | 2015-10-17 22:14:29 -0700 |
---|---|---|
committer | Julien Boeuf <jboeuf@google.com> | 2015-10-17 22:14:29 -0700 |
commit | 3bb61d8917a506c8bbeadfa651f59ec37c74849f (patch) | |
tree | a5b5ecd11d25d354de1b774906fed642d4cb562f /src | |
parent | 18a12c4e9b1b86a9b29302ed8aac9dc9e5cecfe3 (diff) | |
parent | 627f98454cf120d1b91205b381dcbc1c9c2cccfb (diff) |
Merge branch 'master' of https://github.com/grpc/grpc into core_creds_api_change
Diffstat (limited to 'src')
108 files changed, 1747 insertions, 965 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 1bf2b16ed6..3c8ca8ab45 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -585,7 +585,7 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer, "class Service : public ::grpc::SynchronousService {\n" " public:\n"); printer->Indent(); - printer->Print("Service() : service_(nullptr) {}\n"); + printer->Print("Service();\n"); printer->Print("virtual ~Service();\n"); for (int i = 0; i < service->method_count(); ++i) { PrintHeaderServerMethodSync(printer, service->method(i), vars); @@ -594,7 +594,7 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer, printer->Outdent(); printer->Print( " private:\n" - " ::grpc::RpcService* service_;\n"); + " std::unique_ptr< ::grpc::RpcService> service_;\n"); printer->Print("};\n"); // Server side - Asynchronous @@ -1014,8 +1014,10 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, "{}\n\n"); printer->Print(*vars, + "$ns$$Service$::Service::Service() {\n" + "}\n\n"); + printer->Print(*vars, "$ns$$Service$::Service::~Service() {\n" - " delete service_;\n" "}\n\n"); for (int i = 0; i < service->method_count(); ++i) { (*vars)["Idx"] = as_string(i); @@ -1026,10 +1028,10 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, "::grpc::RpcService* $ns$$Service$::Service::service() {\n"); printer->Indent(); printer->Print( - "if (service_ != nullptr) {\n" - " return service_;\n" + "if (service_) {\n" + " return service_.get();\n" "}\n"); - printer->Print("service_ = new ::grpc::RpcService();\n"); + printer->Print("service_ = std::unique_ptr< ::grpc::RpcService>(new ::grpc::RpcService());\n"); for (int i = 0; i < service->method_count(); ++i) { const grpc::protobuf::MethodDescriptor *method = service->method(i); (*vars)["Idx"] = as_string(i); @@ -1077,7 +1079,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } } - printer->Print("return service_;\n"); + printer->Print("return service_.get();\n"); printer->Outdent(); printer->Print("}\n\n"); } diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c index 3545307b66..872543057e 100644 --- a/src/core/census/grpc_filter.c +++ b/src/core/census/grpc_filter.c @@ -132,6 +132,7 @@ static void client_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *initial_op) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); + memset(d, 0, sizeof(*d)); d->start_ts = gpr_now(GPR_CLOCK_REALTIME); if (initial_op) client_mutate_op(elem, initial_op); } @@ -149,6 +150,7 @@ static void server_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *initial_op) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); + memset(d, 0, sizeof(*d)); d->start_ts = gpr_now(GPR_CLOCK_REALTIME); /* TODO(hongyu): call census_tracing_start_op here. */ grpc_closure_init(d->on_done_recv, server_on_done_recv, elem); diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 08aa721a4d..9f85557ea1 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -36,16 +36,18 @@ #include <stdio.h> #include <string.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> + #include "src/core/channel/channel_args.h" #include "src/core/channel/connected_channel.h" -#include "src/core/surface/channel.h" #include "src/core/iomgr/iomgr.h" +#include "src/core/profiling/timers.h" #include "src/core/support/string.h" +#include "src/core/surface/channel.h" #include "src/core/transport/connectivity_state.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/sync.h> -#include <grpc/support/useful.h> /* Client channel implementation */ @@ -196,13 +198,12 @@ static int is_empty(void *p, int len) { return 1; } -static void started_call(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { +static void started_call_locked(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { call_data *calld = arg; grpc_transport_stream_op op; int have_waiting; - gpr_mu_lock(&calld->mu_state); if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) { memset(&op, 0, sizeof(op)); op.cancel_with_status = GRPC_STATUS_CANCELLED; @@ -230,10 +231,20 @@ static void started_call(grpc_exec_ctx *exec_ctx, void *arg, } } +static void started_call(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { + call_data *calld = arg; + gpr_mu_lock(&calld->mu_state); + started_call_locked(exec_ctx, arg, iomgr_success); +} + static void picked_target(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { call_data *calld = arg; grpc_pollset *pollset; + grpc_subchannel_call_create_status call_creation_status; + + GPR_TIMER_BEGIN("picked_target", 0); if (calld->picked_channel == NULL) { /* treat this like a cancellation */ @@ -248,13 +259,19 @@ static void picked_target(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK); calld->state = CALL_WAITING_FOR_CALL; pollset = calld->waiting_op.bind_pollset; - gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->async_setup_task, started_call, calld); - grpc_subchannel_create_call(exec_ctx, calld->picked_channel, pollset, - &calld->subchannel_call, - &calld->async_setup_task); + call_creation_status = grpc_subchannel_create_call( + exec_ctx, calld->picked_channel, pollset, &calld->subchannel_call, + &calld->async_setup_task); + if (call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY) { + started_call_locked(exec_ctx, calld, iomgr_success); + } else { + gpr_mu_unlock(&calld->mu_state); + } } } + + GPR_TIMER_END("picked_target", 0); } static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, @@ -315,6 +332,7 @@ static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *subchannel_call; grpc_lb_policy *lb_policy; grpc_transport_stream_op op2; + GPR_TIMER_BEGIN("perform_transport_stream_op", 0); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); @@ -426,6 +444,8 @@ static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx, } break; } + + GPR_TIMER_END("perform_transport_stream_op", 0); } static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index c32f150715..20b5084044 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -41,6 +41,7 @@ #include "src/core/channel/compress_filter.h" #include "src/core/channel/channel_args.h" +#include "src/core/profiling/timers.h" #include "src/core/compression/message_compress.h" #include "src/core/support/string.h" @@ -271,10 +272,14 @@ static void process_send_ops(grpc_call_element *elem, static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op *op) { + GPR_TIMER_BEGIN("compress_start_transport_stream_op", 0); + if (op->send_ops && op->send_ops->nops > 0) { process_send_ops(elem, op->send_ops); } + GPR_TIMER_END("compress_start_transport_stream_op", 0); + /* pass control down the stack */ grpc_call_next_op(exec_ctx, elem, op); } diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index f9fc280259..6d4d7be632 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -39,6 +39,7 @@ #include "src/core/support/string.h" #include "src/core/transport/transport.h" +#include "src/core/profiling/timers.h" #include <grpc/byte_buffer.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index d67dc37ad2..f78a5cc315 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -36,6 +36,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include "src/core/support/string.h" +#include "src/core/profiling/timers.h" typedef struct call_data { grpc_linked_mdelem method; @@ -162,8 +163,10 @@ static void hc_mutate_op(grpc_call_element *elem, static void hc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op *op) { + GPR_TIMER_BEGIN("hc_start_transport_op", 0); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); hc_mutate_op(elem, op); + GPR_TIMER_END("hc_start_transport_op", 0); grpc_call_next_op(exec_ctx, elem, op); } diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 5e6d684a52..99e5066a4e 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -36,6 +36,7 @@ #include <string.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include "src/core/profiling/timers.h" typedef struct call_data { gpr_uint8 got_initial_metadata; @@ -230,8 +231,10 @@ static void hs_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + GPR_TIMER_BEGIN("hs_start_transport_op", 0); hs_mutate_op(elem, op); grpc_call_next_op(exec_ctx, elem, op); + GPR_TIMER_END("hs_start_transport_op", 0); } /* Constructor for call_data */ diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 4b3aaab09c..e5bf0680ff 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -175,7 +175,7 @@ void pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } } -static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, +static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { pick_first_lb_policy *p = arg; size_t i; @@ -235,7 +235,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_SUBCHANNEL_REF(p->selected, "picked_first"); /* drop the pick list: we are connected now */ GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels"); - grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(destroy_subchannels, p), 1); + grpc_exec_ctx_enqueue(exec_ctx, + grpc_closure_create(destroy_subchannels, p), 1); /* update any calls that were waiting for a pick */ while ((pp = p->pending_picks)) { p->pending_picks = pp->next; diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index a2c521a20d..095000ba4f 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -40,7 +40,7 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/client_channel.h" #include "src/core/channel/connected_channel.h" -#include "src/core/iomgr/alarm.h" +#include "src/core/iomgr/timer.h" #include "src/core/transport/connectivity_state.h" #include "src/core/surface/channel.h" @@ -130,7 +130,7 @@ struct grpc_subchannel { /** do we have an active alarm? */ int have_alarm; /** our alarm */ - grpc_alarm alarm; + grpc_timer alarm; /** current random value */ gpr_uint32 random; }; @@ -335,18 +335,20 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { + grpc_subchannel_call_create_status call_creation_status; waiting_for_connect *w4c = arg; grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset); - grpc_subchannel_create_call(exec_ctx, w4c->subchannel, w4c->pollset, - w4c->target, w4c->notify); + call_creation_status = grpc_subchannel_create_call( + exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify); + GPR_ASSERT(call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY); + w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success); GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect"); gpr_free(w4c); } -void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, - grpc_pollset *pollset, - grpc_subchannel_call **target, - grpc_closure *notify) { +grpc_subchannel_call_create_status grpc_subchannel_create_call( + grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset, + grpc_subchannel_call **target, grpc_closure *notify) { connection *con; gpr_mu_lock(&c->mu); if (c->active != NULL) { @@ -355,7 +357,7 @@ void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, gpr_mu_unlock(&c->mu); *target = create_call(exec_ctx, con); - notify->cb(exec_ctx, notify->cb_arg, 1); + return GRPC_SUBCHANNEL_CALL_CREATE_READY; } else { waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c)); w4c->next = c->waiting; @@ -380,6 +382,7 @@ void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, } else { gpr_mu_unlock(&c->mu); } + return GRPC_SUBCHANNEL_CALL_CREATE_PENDING; } } @@ -459,7 +462,7 @@ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, } if (cancel_alarm) { - grpc_alarm_cancel(exec_ctx, &c->alarm); + grpc_timer_cancel(exec_ctx, &c->alarm); } if (op->disconnect) { @@ -690,7 +693,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; connectivity_state_changed_locked(exec_ctx, c, "connect_failed"); - grpc_alarm_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); + grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); gpr_mu_unlock(&c->mu); } } diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 86b7fa5851..a26d08f02e 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -75,12 +75,22 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -/** construct a call (possibly asynchronously) */ -void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, - grpc_subchannel *subchannel, - grpc_pollset *pollset, - grpc_subchannel_call **target, - grpc_closure *notify); +typedef enum { + GRPC_SUBCHANNEL_CALL_CREATE_READY, + GRPC_SUBCHANNEL_CALL_CREATE_PENDING +} grpc_subchannel_call_create_status; + +/** construct a subchannel call (possibly asynchronously). + * + * If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will + * return immediately and \a target will point to a connected \a subchannel_call + * instance. Note that \a notify will \em not be invoked in this case. + * Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the + * subchannel call will be created asynchronously, invoking the \a notify + * callback upon completion. */ +grpc_subchannel_call_create_status grpc_subchannel_create_call( + grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_pollset *pollset, + grpc_subchannel_call **target, grpc_closure *notify); /** process a transport level op */ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, diff --git a/src/core/iomgr/exec_ctx.c b/src/core/iomgr/exec_ctx.c index f2914d376e..410b34c521 100644 --- a/src/core/iomgr/exec_ctx.c +++ b/src/core/iomgr/exec_ctx.c @@ -35,18 +35,24 @@ #include <grpc/support/log.h> +#include "src/core/profiling/timers.h" + int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { int did_something = 0; + GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); while (!grpc_closure_list_empty(exec_ctx->closure_list)) { grpc_closure *c = exec_ctx->closure_list.head; exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; while (c != NULL) { grpc_closure *next = c->next; - did_something = 1; + did_something++; + GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0); c->cb(exec_ctx, c->cb_arg, c->success); + GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0); c = next; } } + GPR_TIMER_END("grpc_exec_ctx_flush", 0); return did_something; } diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 231bc988a8..7ff80e6cf8 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -286,7 +286,7 @@ static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) { void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { gpr_mu_lock(&fd->mu); - GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown)); + GPR_ASSERT(!fd->shutdown); fd->shutdown = 1; set_ready_locked(exec_ctx, fd, &fd->read_closure); set_ready_locked(exec_ctx, fd, &fd->write_closure); @@ -320,7 +320,7 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, gpr_mu_lock(&fd->mu); /* if we are shutdown, then don't add to the watcher set */ - if (gpr_atm_no_barrier_load(&fd->shutdown)) { + if (fd->shutdown) { watcher->fd = NULL; watcher->pollset = NULL; watcher->worker = NULL; diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index cebd863924..65da3a9d2d 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -42,7 +42,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/thd.h> -#include "src/core/iomgr/alarm_internal.h" +#include "src/core/iomgr/timer_internal.h" #include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/socket_windows.h" diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index e61fc32925..212ce5534d 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -43,7 +43,7 @@ #include <grpc/support/thd.h> #include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/alarm_internal.h" +#include "src/core/iomgr/timer_internal.h" #include "src/core/support/string.h" static gpr_mu g_mu; @@ -55,7 +55,7 @@ void grpc_iomgr_init(void) { g_shutdown = 0; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); - grpc_alarm_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; grpc_iomgr_platform_init(); @@ -98,7 +98,7 @@ void grpc_iomgr_shutdown(void) { } last_warning_time = gpr_now(GPR_CLOCK_REALTIME); } - if (grpc_alarm_check(&exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), + if (grpc_timer_check(&exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) { gpr_mu_unlock(&g_mu); grpc_exec_ctx_flush(&exec_ctx); @@ -124,7 +124,7 @@ void grpc_iomgr_shutdown(void) { } gpr_mu_unlock(&g_mu); - grpc_alarm_list_shutdown(&exec_ctx); + grpc_timer_list_shutdown(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx); /* ensure all threads have left g_mu */ diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index ba9ba73f9d..2aafd21dfb 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -41,10 +41,11 @@ #include <sys/epoll.h> #include <unistd.h> -#include "src/core/iomgr/fd_posix.h" -#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include "src/core/iomgr/fd_posix.h" +#include "src/core/support/block_annotate.h" +#include "src/core/profiling/timers.h" typedef struct wakeup_fd_hdl { grpc_wakeup_fd wakeup_fd; @@ -182,9 +183,11 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid even going into the blocking annotation if possible */ + GPR_TIMER_BEGIN("poll", 0); GRPC_SCHEDULING_START_BLOCKING_REGION; poll_rv = grpc_poll_function(pfds, 2, timeout_ms); GRPC_SCHEDULING_END_BLOCKING_REGION; + GPR_TIMER_END("poll", 0); if (poll_rv < 0) { if (errno != EINTR) { diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index d056866d13..bce1ce9714 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -42,7 +42,7 @@ #include <string.h> #include <unistd.h> -#include "src/core/iomgr/alarm_internal.h" +#include "src/core/iomgr/timer_internal.h" #include "src/core/iomgr/fd_posix.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/socket_utils_posix.h" @@ -101,9 +101,12 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { void grpc_pollset_kick_ext(grpc_pollset *p, grpc_pollset_worker *specific_worker, gpr_uint32 flags) { + GPR_TIMER_BEGIN("grpc_pollset_kick_ext", 0); + /* pollset->mu already held */ if (specific_worker != NULL) { if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { + GPR_TIMER_BEGIN("grpc_pollset_kick_ext.broadcast", 0); GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); for (specific_worker = p->root_worker.next; specific_worker != &p->root_worker; @@ -111,44 +114,50 @@ void grpc_pollset_kick_ext(grpc_pollset *p, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); } p->kicked_without_pollers = 1; - return; + GPR_TIMER_END("grpc_pollset_kick_ext.broadcast", 0); } else if (gpr_tls_get(&g_current_thread_worker) != (gpr_intptr)specific_worker) { + GPR_TIMER_MARK("different_thread_worker", 0); if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { specific_worker->reevaluate_polling_on_wakeup = 1; } grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); - return; } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { + GPR_TIMER_MARK("kick_yoself", 0); if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { specific_worker->reevaluate_polling_on_wakeup = 1; } grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); - return; } } else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) { GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); + GPR_TIMER_MARK("kick_anonymous", 0); specific_worker = pop_front_worker(p); if (specific_worker != NULL) { if (gpr_tls_get(&g_current_thread_worker) == (gpr_intptr)specific_worker) { + GPR_TIMER_MARK("kick_anonymous_not_self", 0); push_back_worker(p, specific_worker); specific_worker = pop_front_worker(p); if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 && gpr_tls_get(&g_current_thread_worker) == (gpr_intptr)specific_worker) { push_back_worker(p, specific_worker); - return; + specific_worker = NULL; } } - push_back_worker(p, specific_worker); - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); - return; + if (specific_worker != NULL) { + GPR_TIMER_MARK("finally_kick", 0); + push_back_worker(p, specific_worker); + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); + } } else { + GPR_TIMER_MARK("kicked_no_pollers", 0); p->kicked_without_pollers = 1; - return; } } + + GPR_TIMER_END("grpc_pollset_kick_ext", 0); } void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { @@ -229,6 +238,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int locked = 1; int queued_work = 0; int keep_polling = 0; + GPR_TIMER_BEGIN("grpc_pollset_work", 0); /* this must happen before we (potentially) drop pollset->mu */ worker->next = worker->prev = NULL; worker->reevaluate_polling_on_wakeup = 0; @@ -245,7 +255,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, each time through on every pollset. May update deadline to ensure timely wakeups. TODO(ctiller): can this work be localized? */ - if (grpc_alarm_check(exec_ctx, now, &deadline)) { + if (grpc_timer_check(exec_ctx, now, &deadline)) { gpr_mu_unlock(&pollset->mu); locked = 0; goto done; @@ -270,14 +280,15 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (!added_worker) { push_front_worker(pollset, worker); added_worker = 1; + gpr_tls_set(&g_current_thread_worker, (gpr_intptr)worker); } gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset); - gpr_tls_set(&g_current_thread_worker, (gpr_intptr)worker); + GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker, deadline, now); + GPR_TIMER_END("maybe_work_and_unlock", 0); locked = 0; gpr_tls_set(&g_current_thread_poller, 0); - gpr_tls_set(&g_current_thread_worker, 0); } else { pollset->kicked_without_pollers = 0; } @@ -307,6 +318,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } if (added_worker) { remove_worker(pollset, worker); + gpr_tls_set(&g_current_thread_worker, 0); } grpc_wakeup_fd_destroy(&worker->wakeup_fd); if (pollset->shutting_down) { @@ -329,6 +341,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_lock(&pollset->mu); } } + GPR_TIMER_END("grpc_pollset_work", 0); } void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -563,10 +576,11 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, even going into the blocking annotation if possible */ /* poll fd count (argument 2) is shortened by one if we have no events to poll on - such that it only includes the kicker */ + GPR_TIMER_BEGIN("poll", 0); GRPC_SCHEDULING_START_BLOCKING_REGION; r = grpc_poll_function(pfd, nfds, timeout); GRPC_SCHEDULING_END_BLOCKING_REGION; - GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); + GPR_TIMER_END("poll", 0); if (r < 0) { gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 96abaea0b3..9f74580273 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -37,7 +37,7 @@ #include <grpc/support/thd.h> -#include "src/core/iomgr/alarm_internal.h" +#include "src/core/iomgr/timer_internal.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/pollset.h" @@ -136,7 +136,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, worker->kicked = 0; worker->pollset = pollset; gpr_cv_init(&worker->cv); - if (grpc_alarm_check(exec_ctx, now, &deadline)) { + if (grpc_timer_check(exec_ctx, now, &deadline)) { goto done; } if (!pollset->kicked_without_pollers && !pollset->shutting_down) { diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index fe20039264..abd6315ca1 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -42,7 +42,7 @@ #include <string.h> #include <unistd.h> -#include "src/core/iomgr/alarm.h" +#include "src/core/iomgr/timer.h" #include "src/core/iomgr/iomgr_posix.h" #include "src/core/iomgr/pollset_posix.h" #include "src/core/iomgr/sockaddr_utils.h" @@ -60,7 +60,7 @@ typedef struct { gpr_mu mu; grpc_fd *fd; gpr_timespec deadline; - grpc_alarm alarm; + grpc_timer alarm; int refs; grpc_closure write_closure; grpc_pollset_set *interested_parties; @@ -132,7 +132,7 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, int success) { ac->fd = NULL; gpr_mu_unlock(&ac->mu); - grpc_alarm_cancel(exec_ctx, &ac->alarm); + grpc_timer_cancel(exec_ctx, &ac->alarm); gpr_mu_lock(&ac->mu); if (success) { @@ -290,7 +290,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, } gpr_mu_lock(&ac->mu); - grpc_alarm_init(exec_ctx, &ac->alarm, + grpc_timer_init(exec_ctx, &ac->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), tc_on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC)); grpc_fd_notify_on_write(exec_ctx, ac->fd, &ac->write_closure); diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index 3540c55676..e5691b7e12 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -43,7 +43,7 @@ #include <grpc/support/slice_buffer.h> #include <grpc/support/useful.h> -#include "src/core/iomgr/alarm.h" +#include "src/core/iomgr/timer.h" #include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/tcp_client.h" #include "src/core/iomgr/tcp_windows.h" @@ -56,7 +56,7 @@ typedef struct { gpr_mu mu; grpc_winsocket *socket; gpr_timespec deadline; - grpc_alarm alarm; + grpc_timer alarm; char *addr_name; int refs; grpc_closure on_connect; @@ -91,7 +91,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, int from_iocp) { grpc_winsocket_callback_info *info = &ac->socket->write_info; grpc_closure *on_done = ac->on_done; - grpc_alarm_cancel(exec_ctx, &ac->alarm); + grpc_timer_cancel(exec_ctx, &ac->alarm); gpr_mu_lock(&ac->mu); @@ -201,7 +201,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, ac->endpoint = endpoint; grpc_closure_init(&ac->on_connect, on_connect, ac); - grpc_alarm_init(exec_ctx, &ac->alarm, deadline, on_alarm, ac, + grpc_timer_init(exec_ctx, &ac->alarm, deadline, on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC)); grpc_socket_notify_on_write(exec_ctx, socket, &ac->on_connect); return; diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 4a57037a72..915553d509 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -180,7 +180,7 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { GPR_ASSERT(!tcp->finished_edge); GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC); GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC); - GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0); + GPR_TIMER_BEGIN("tcp_continue_read", 0); while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) { gpr_slice_buffer_add_indexed(tcp->incoming_buffer, @@ -199,11 +199,11 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { msg.msg_controllen = 0; msg.msg_flags = 0; - GRPC_TIMER_BEGIN(GRPC_PTAG_RECVMSG, 0); + GPR_TIMER_BEGIN("recvmsg", 1); do { read_bytes = recvmsg(tcp->fd, &msg, 0); } while (read_bytes < 0 && errno == EINTR); - GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0); + GPR_TIMER_END("recvmsg", 0); if (read_bytes < 0) { /* NB: After calling call_read_cb a parallel call of the read handler may @@ -240,7 +240,7 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { TCP_UNREF(exec_ctx, tcp, "read"); } - GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0); + GPR_TIMER_END("tcp_continue_read", 0); } static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, @@ -316,12 +316,12 @@ static flush_result tcp_flush(grpc_tcp *tcp) { msg.msg_controllen = 0; msg.msg_flags = 0; - GRPC_TIMER_BEGIN(GRPC_PTAG_SENDMSG, 0); + GPR_TIMER_BEGIN("sendmsg", 1); do { /* TODO(klempner): Cork if this is a partial write */ sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS); } while (sent_length < 0 && errno == EINTR); - GRPC_TIMER_END(GRPC_PTAG_SENDMSG, 0); + GPR_TIMER_END("sendmsg", 0); if (sent_length < 0) { if (errno == EAGAIN) { @@ -370,17 +370,17 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, return; } - GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0); status = tcp_flush(tcp); if (status == FLUSH_PENDING) { grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); } else { cb = tcp->write_cb; tcp->write_cb = NULL; + GPR_TIMER_BEGIN("tcp_handle_write.cb", 0); cb->cb(exec_ctx, cb->cb_arg, status == FLUSH_DONE); + GPR_TIMER_END("tcp_handle_write.cb", 0); TCP_UNREF(exec_ctx, tcp, "write"); } - GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0); } static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -399,11 +399,11 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } } - GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_WRITE, 0); + GPR_TIMER_BEGIN("tcp_write", 0); GPR_ASSERT(tcp->write_cb == NULL); if (buf->length == 0) { - GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0); + GPR_TIMER_END("tcp_write", 0); grpc_exec_ctx_enqueue(exec_ctx, cb, 1); return; } @@ -420,7 +420,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE); } - GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0); + GPR_TIMER_END("tcp_write", 0); } static void tcp_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index 9ceffca065..5ff78231bd 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -44,7 +44,7 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> -#include "src/core/iomgr/alarm.h" +#include "src/core/iomgr/timer.h" #include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/sockaddr.h" #include "src/core/iomgr/sockaddr_utils.h" diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/timer.c index 0ba5361606..66fafe75ad 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/timer.c @@ -31,10 +31,10 @@ * */ -#include "src/core/iomgr/alarm.h" +#include "src/core/iomgr/timer.h" -#include "src/core/iomgr/alarm_heap.h" -#include "src/core/iomgr/alarm_internal.h" +#include "src/core/iomgr/timer_heap.h" +#include "src/core/iomgr/timer_internal.h" #include "src/core/iomgr/time_averaged_stats.h" #include <grpc/support/log.h> #include <grpc/support/sync.h> @@ -51,37 +51,37 @@ typedef struct { gpr_mu mu; grpc_time_averaged_stats stats; - /* All and only alarms with deadlines <= this will be in the heap. */ + /* All and only timers with deadlines <= this will be in the heap. */ gpr_timespec queue_deadline_cap; gpr_timespec min_deadline; /* Index in the g_shard_queue */ gpr_uint32 shard_queue_index; - /* This holds all alarms with deadlines < queue_deadline_cap. Alarms in this + /* This holds all timers with deadlines < queue_deadline_cap. Timers in this list have the top bit of their deadline set to 0. */ - grpc_alarm_heap heap; - /* This holds alarms whose deadline is >= queue_deadline_cap. */ - grpc_alarm list; + grpc_timer_heap heap; + /* This holds timers whose deadline is >= queue_deadline_cap. */ + grpc_timer list; } shard_type; /* Protects g_shard_queue */ static gpr_mu g_mu; -/* Allow only one run_some_expired_alarms at once */ +/* Allow only one run_some_expired_timers at once */ static gpr_mu g_checker_mu; static gpr_clock_type g_clock_type; static shard_type g_shards[NUM_SHARDS]; /* Protected by g_mu */ static shard_type *g_shard_queue[NUM_SHARDS]; -static int run_some_expired_alarms(grpc_exec_ctx *exec_ctx, gpr_timespec now, +static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_timespec *next, int success); static gpr_timespec compute_min_deadline(shard_type *shard) { - return grpc_alarm_heap_is_empty(&shard->heap) + return grpc_timer_heap_is_empty(&shard->heap) ? shard->queue_deadline_cap - : grpc_alarm_heap_top(&shard->heap)->deadline; + : grpc_timer_heap_top(&shard->heap)->deadline; } -void grpc_alarm_list_init(gpr_timespec now) { +void grpc_timer_list_init(gpr_timespec now) { gpr_uint32 i; gpr_mu_init(&g_mu); @@ -95,27 +95,27 @@ void grpc_alarm_list_init(gpr_timespec now) { 0.5); shard->queue_deadline_cap = now; shard->shard_queue_index = i; - grpc_alarm_heap_init(&shard->heap); + grpc_timer_heap_init(&shard->heap); shard->list.next = shard->list.prev = &shard->list; shard->min_deadline = compute_min_deadline(shard); g_shard_queue[i] = shard; } } -void grpc_alarm_list_shutdown(grpc_exec_ctx *exec_ctx) { +void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { int i; - run_some_expired_alarms(exec_ctx, gpr_inf_future(g_clock_type), NULL, 0); + run_some_expired_timers(exec_ctx, gpr_inf_future(g_clock_type), NULL, 0); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; gpr_mu_destroy(&shard->mu); - grpc_alarm_heap_destroy(&shard->heap); + grpc_timer_heap_destroy(&shard->heap); } gpr_mu_destroy(&g_mu); gpr_mu_destroy(&g_checker_mu); } /* This is a cheap, but good enough, pointer hash for sharding the tasks: */ -static size_t shard_idx(const grpc_alarm *info) { +static size_t shard_idx(const grpc_timer *info) { size_t x = (size_t)info; return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (NUM_SHARDS - 1); } @@ -132,15 +132,15 @@ static gpr_timespec dbl_to_ts(double d) { return ts; } -static void list_join(grpc_alarm *head, grpc_alarm *alarm) { - alarm->next = head; - alarm->prev = head->prev; - alarm->next->prev = alarm->prev->next = alarm; +static void list_join(grpc_timer *head, grpc_timer *timer) { + timer->next = head; + timer->prev = head->prev; + timer->next->prev = timer->prev->next = timer; } -static void list_remove(grpc_alarm *alarm) { - alarm->next->prev = alarm->prev; - alarm->prev->next = alarm->next; +static void list_remove(grpc_timer *timer) { + timer->next->prev = timer->prev; + timer->prev->next = timer->next; } static void swap_adjacent_shards_in_queue(gpr_uint32 first_shard_queue_index) { @@ -170,16 +170,16 @@ static void note_deadline_change(shard_type *shard) { } } -void grpc_alarm_init(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm, - gpr_timespec deadline, grpc_iomgr_cb_func alarm_cb, - void *alarm_cb_arg, gpr_timespec now) { - int is_first_alarm = 0; - shard_type *shard = &g_shards[shard_idx(alarm)]; +void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, + gpr_timespec deadline, grpc_iomgr_cb_func timer_cb, + void *timer_cb_arg, gpr_timespec now) { + int is_first_timer = 0; + shard_type *shard = &g_shards[shard_idx(timer)]; GPR_ASSERT(deadline.clock_type == g_clock_type); GPR_ASSERT(now.clock_type == g_clock_type); - grpc_closure_init(&alarm->closure, alarm_cb, alarm_cb_arg); - alarm->deadline = deadline; - alarm->triggered = 0; + grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg); + timer->deadline = deadline; + timer->triggered = 0; /* TODO(ctiller): check deadline expired */ @@ -187,25 +187,25 @@ void grpc_alarm_init(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm, grpc_time_averaged_stats_add_sample(&shard->stats, ts_to_dbl(gpr_time_sub(deadline, now))); if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) { - is_first_alarm = grpc_alarm_heap_add(&shard->heap, alarm); + is_first_timer = grpc_timer_heap_add(&shard->heap, timer); } else { - alarm->heap_index = INVALID_HEAP_INDEX; - list_join(&shard->list, alarm); + timer->heap_index = INVALID_HEAP_INDEX; + list_join(&shard->list, timer); } gpr_mu_unlock(&shard->mu); /* Deadline may have decreased, we need to adjust the master queue. Note that there is a potential racy unlocked region here. There could be a - reordering of multiple grpc_alarm_init calls, at this point, but the < test + reordering of multiple grpc_timer_init calls, at this point, but the < test below should ensure that we err on the side of caution. There could - also be a race with grpc_alarm_check, which might beat us to the lock. In - that case, it is possible that the alarm that we added will have already + also be a race with grpc_timer_check, which might beat us to the lock. In + that case, it is possible that the timer that we added will have already run by the time we hold the lock, but that too is a safe error. - Finally, it's possible that the grpc_alarm_check that intervened failed to - trigger the new alarm because the min_deadline hadn't yet been reduced. - In that case, the alarm will simply have to wait for the next - grpc_alarm_check. */ - if (is_first_alarm) { + Finally, it's possible that the grpc_timer_check that intervened failed to + trigger the new timer because the min_deadline hadn't yet been reduced. + In that case, the timer will simply have to wait for the next + grpc_timer_check. */ + if (is_first_timer) { gpr_mu_lock(&g_mu); if (gpr_time_cmp(deadline, shard->min_deadline) < 0) { gpr_timespec old_min_deadline = g_shard_queue[0]->min_deadline; @@ -220,16 +220,16 @@ void grpc_alarm_init(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm, } } -void grpc_alarm_cancel(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm) { - shard_type *shard = &g_shards[shard_idx(alarm)]; +void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { + shard_type *shard = &g_shards[shard_idx(timer)]; gpr_mu_lock(&shard->mu); - if (!alarm->triggered) { - grpc_exec_ctx_enqueue(exec_ctx, &alarm->closure, 0); - alarm->triggered = 1; - if (alarm->heap_index == INVALID_HEAP_INDEX) { - list_remove(alarm); + if (!timer->triggered) { + grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, 0); + timer->triggered = 1; + if (timer->heap_index == INVALID_HEAP_INDEX) { + list_remove(timer); } else { - grpc_alarm_heap_remove(&shard->heap, alarm); + grpc_timer_heap_remove(&shard->heap, timer); } } gpr_mu_unlock(&shard->mu); @@ -237,7 +237,7 @@ void grpc_alarm_cancel(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm) { /* This is called when the queue is empty and "now" has reached the queue_deadline_cap. We compute a new queue deadline and then scan the map - for alarms that fall at or under it. Returns true if the queue is no + for timers that fall at or under it. Returns true if the queue is no longer empty. REQUIRES: shard->mu locked */ static int refill_queue(shard_type *shard, gpr_timespec now) { @@ -248,49 +248,49 @@ static int refill_queue(shard_type *shard, gpr_timespec now) { double deadline_delta = GPR_CLAMP(computed_deadline_delta, MIN_QUEUE_WINDOW_DURATION, MAX_QUEUE_WINDOW_DURATION); - grpc_alarm *alarm, *next; + grpc_timer *timer, *next; - /* Compute the new cap and put all alarms under it into the queue: */ + /* Compute the new cap and put all timers under it into the queue: */ shard->queue_deadline_cap = gpr_time_add( gpr_time_max(now, shard->queue_deadline_cap), dbl_to_ts(deadline_delta)); - for (alarm = shard->list.next; alarm != &shard->list; alarm = next) { - next = alarm->next; + for (timer = shard->list.next; timer != &shard->list; timer = next) { + next = timer->next; - if (gpr_time_cmp(alarm->deadline, shard->queue_deadline_cap) < 0) { - list_remove(alarm); - grpc_alarm_heap_add(&shard->heap, alarm); + if (gpr_time_cmp(timer->deadline, shard->queue_deadline_cap) < 0) { + list_remove(timer); + grpc_timer_heap_add(&shard->heap, timer); } } - return !grpc_alarm_heap_is_empty(&shard->heap); + return !grpc_timer_heap_is_empty(&shard->heap); } -/* This pops the next non-cancelled alarm with deadline <= now from the queue, +/* This pops the next non-cancelled timer with deadline <= now from the queue, or returns NULL if there isn't one. REQUIRES: shard->mu locked */ -static grpc_alarm *pop_one(shard_type *shard, gpr_timespec now) { - grpc_alarm *alarm; +static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) { + grpc_timer *timer; for (;;) { - if (grpc_alarm_heap_is_empty(&shard->heap)) { + if (grpc_timer_heap_is_empty(&shard->heap)) { if (gpr_time_cmp(now, shard->queue_deadline_cap) < 0) return NULL; if (!refill_queue(shard, now)) return NULL; } - alarm = grpc_alarm_heap_top(&shard->heap); - if (gpr_time_cmp(alarm->deadline, now) > 0) return NULL; - alarm->triggered = 1; - grpc_alarm_heap_pop(&shard->heap); - return alarm; + timer = grpc_timer_heap_top(&shard->heap); + if (gpr_time_cmp(timer->deadline, now) > 0) return NULL; + timer->triggered = 1; + grpc_timer_heap_pop(&shard->heap); + return timer; } } /* REQUIRES: shard->mu unlocked */ -static size_t pop_alarms(grpc_exec_ctx *exec_ctx, shard_type *shard, +static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, gpr_timespec now, gpr_timespec *new_min_deadline, int success) { size_t n = 0; - grpc_alarm *alarm; + grpc_timer *timer; gpr_mu_lock(&shard->mu); - while ((alarm = pop_one(shard, now))) { - grpc_exec_ctx_enqueue(exec_ctx, &alarm->closure, success); + while ((timer = pop_one(shard, now))) { + grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, success); n++; } *new_min_deadline = compute_min_deadline(shard); @@ -298,11 +298,11 @@ static size_t pop_alarms(grpc_exec_ctx *exec_ctx, shard_type *shard, return n; } -static int run_some_expired_alarms(grpc_exec_ctx *exec_ctx, gpr_timespec now, +static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_timespec *next, int success) { size_t n = 0; - /* TODO(ctiller): verify that there are any alarms (atomically) here */ + /* TODO(ctiller): verify that there are any timers (atomically) here */ if (gpr_mu_trylock(&g_checker_mu)) { gpr_mu_lock(&g_mu); @@ -310,16 +310,16 @@ static int run_some_expired_alarms(grpc_exec_ctx *exec_ctx, gpr_timespec now, while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) { gpr_timespec new_min_deadline; - /* For efficiency, we pop as many available alarms as we can from the - shard. This may violate perfect alarm deadline ordering, but that + /* For efficiency, we pop as many available timers as we can from the + shard. This may violate perfect timer deadline ordering, but that shouldn't be a big deal because we don't make ordering guarantees. */ - n += pop_alarms(exec_ctx, g_shard_queue[0], now, &new_min_deadline, + n += pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, success); - /* An grpc_alarm_init() on the shard could intervene here, adding a new - alarm that is earlier than new_min_deadline. However, - grpc_alarm_init() will block on the master_lock before it can call - set_min_deadline, so this one will complete first and then the AddAlarm + /* An grpc_timer_init() on the shard could intervene here, adding a new + timer that is earlier than new_min_deadline. However, + grpc_timer_init() will block on the master_lock before it can call + set_min_deadline, so this one will complete first and then the Addtimer will reduce the min_deadline (perhaps unnecessarily). */ g_shard_queue[0]->min_deadline = new_min_deadline; note_deadline_change(g_shard_queue[0]); @@ -336,15 +336,15 @@ static int run_some_expired_alarms(grpc_exec_ctx *exec_ctx, gpr_timespec now, return (int)n; } -int grpc_alarm_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, +int grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_timespec *next) { GPR_ASSERT(now.clock_type == g_clock_type); - return run_some_expired_alarms( + return run_some_expired_timers( exec_ctx, now, next, gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0); } -gpr_timespec grpc_alarm_list_next_timeout(void) { +gpr_timespec grpc_timer_list_next_timeout(void) { gpr_timespec out; gpr_mu_lock(&g_mu); out = g_shard_queue[0]->min_deadline; diff --git a/src/core/iomgr/alarm.h b/src/core/iomgr/timer.h index 94f9bc1355..9abe58133d 100644 --- a/src/core/iomgr/alarm.h +++ b/src/core/iomgr/timer.h @@ -31,59 +31,59 @@ * */ -#ifndef GRPC_INTERNAL_CORE_IOMGR_ALARM_H -#define GRPC_INTERNAL_CORE_IOMGR_ALARM_H +#ifndef GRPC_INTERNAL_CORE_IOMGR_TIMER_H +#define GRPC_INTERNAL_CORE_IOMGR_TIMER_H #include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/exec_ctx.h" #include <grpc/support/port_platform.h> #include <grpc/support/time.h> -typedef struct grpc_alarm { +typedef struct grpc_timer { gpr_timespec deadline; gpr_uint32 heap_index; /* INVALID_HEAP_INDEX if not in heap */ int triggered; - struct grpc_alarm *next; - struct grpc_alarm *prev; + struct grpc_timer *next; + struct grpc_timer *prev; grpc_closure closure; -} grpc_alarm; +} grpc_timer; -/* Initialize *alarm. When expired or canceled, alarm_cb will be called with - *alarm_cb_arg and status to indicate if it expired (SUCCESS) or was - canceled (CANCELLED). alarm_cb is guaranteed to be called exactly once, +/* Initialize *timer. When expired or canceled, timer_cb will be called with + *timer_cb_arg and status to indicate if it expired (SUCCESS) or was + canceled (CANCELLED). timer_cb is guaranteed to be called exactly once, and application code should check the status to determine how it was invoked. The application callback is also responsible for maintaining information about when to free up any user-level state. */ -void grpc_alarm_init(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm, - gpr_timespec deadline, grpc_iomgr_cb_func alarm_cb, - void *alarm_cb_arg, gpr_timespec now); +void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, + gpr_timespec deadline, grpc_iomgr_cb_func timer_cb, + void *timer_cb_arg, gpr_timespec now); -/* Note that there is no alarm destroy function. This is because the - alarm is a one-time occurrence with a guarantee that the callback will +/* Note that there is no timer destroy function. This is because the + timer is a one-time occurrence with a guarantee that the callback will be called exactly once, either at expiration or cancellation. Thus, all - the internal alarm event management state is destroyed just before + the internal timer event management state is destroyed just before that callback is invoked. If the user has additional state associated with - the alarm, the user is responsible for determining when it is safe to + the timer, the user is responsible for determining when it is safe to destroy that state. */ -/* Cancel an *alarm. +/* Cancel an *timer. There are three cases: - 1. We normally cancel the alarm - 2. The alarm has already run - 3. We can't cancel the alarm because it is "in flight". + 1. We normally cancel the timer + 2. The timer has already run + 3. We can't cancel the timer because it is "in flight". In all of these cases, the cancellation is still considered successful. - They are essentially distinguished in that the alarm_cb will be run + They are essentially distinguished in that the timer_cb will be run exactly once from either the cancellation (with status CANCELLED) or from the activation (with status SUCCESS) Note carefully that the callback function MAY occur in the same callstack - as grpc_alarm_cancel. It's expected that most alarms will be cancelled (their + as grpc_timer_cancel. It's expected that most timers will be cancelled (their primary use is to implement deadlines), and so this code is optimized such that cancellation costs as little as possible. Making callbacks run inline matches this aim. - Requires: cancel() must happen after add() on a given alarm */ -void grpc_alarm_cancel(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm); + Requires: cancel() must happen after add() on a given timer */ +void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer); -#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_TIMER_H */ diff --git a/src/core/iomgr/alarm_heap.c b/src/core/iomgr/timer_heap.c index 769142e425..31d41d6750 100644 --- a/src/core/iomgr/alarm_heap.c +++ b/src/core/iomgr/timer_heap.c @@ -31,7 +31,7 @@ * */ -#include "src/core/iomgr/alarm_heap.h" +#include "src/core/iomgr/timer_heap.h" #include <string.h> @@ -43,7 +43,7 @@ position. This functor is called each time immediately after modifying a value in the underlying container, with the offset of the modified element as its argument. */ -static void adjust_upwards(grpc_alarm **first, gpr_uint32 i, grpc_alarm *t) { +static void adjust_upwards(grpc_timer **first, gpr_uint32 i, grpc_timer *t) { while (i > 0) { gpr_uint32 parent = (gpr_uint32)(((int)i - 1) / 2); if (gpr_time_cmp(first[parent]->deadline, t->deadline) >= 0) break; @@ -58,8 +58,8 @@ static void adjust_upwards(grpc_alarm **first, gpr_uint32 i, grpc_alarm *t) { /* Adjusts a heap so as to move a hole at position i farther away from the root, until a suitable position is found for element t. Then, copies t into that position. */ -static void adjust_downwards(grpc_alarm **first, gpr_uint32 i, - gpr_uint32 length, grpc_alarm *t) { +static void adjust_downwards(grpc_timer **first, gpr_uint32 i, + gpr_uint32 length, grpc_timer *t) { for (;;) { gpr_uint32 left_child = 1u + 2u * i; gpr_uint32 right_child; @@ -83,66 +83,66 @@ static void adjust_downwards(grpc_alarm **first, gpr_uint32 i, #define SHRINK_MIN_ELEMS 8 #define SHRINK_FULLNESS_FACTOR 2 -static void maybe_shrink(grpc_alarm_heap *heap) { - if (heap->alarm_count >= 8 && - heap->alarm_count <= heap->alarm_capacity / SHRINK_FULLNESS_FACTOR / 2) { - heap->alarm_capacity = heap->alarm_count * SHRINK_FULLNESS_FACTOR; - heap->alarms = - gpr_realloc(heap->alarms, heap->alarm_capacity * sizeof(grpc_alarm *)); +static void maybe_shrink(grpc_timer_heap *heap) { + if (heap->timer_count >= 8 && + heap->timer_count <= heap->timer_capacity / SHRINK_FULLNESS_FACTOR / 2) { + heap->timer_capacity = heap->timer_count * SHRINK_FULLNESS_FACTOR; + heap->timers = + gpr_realloc(heap->timers, heap->timer_capacity * sizeof(grpc_timer *)); } } -static void note_changed_priority(grpc_alarm_heap *heap, grpc_alarm *alarm) { - gpr_uint32 i = alarm->heap_index; +static void note_changed_priority(grpc_timer_heap *heap, grpc_timer *timer) { + gpr_uint32 i = timer->heap_index; gpr_uint32 parent = (gpr_uint32)(((int)i - 1) / 2); - if (gpr_time_cmp(heap->alarms[parent]->deadline, alarm->deadline) < 0) { - adjust_upwards(heap->alarms, i, alarm); + if (gpr_time_cmp(heap->timers[parent]->deadline, timer->deadline) < 0) { + adjust_upwards(heap->timers, i, timer); } else { - adjust_downwards(heap->alarms, i, heap->alarm_count, alarm); + adjust_downwards(heap->timers, i, heap->timer_count, timer); } } -void grpc_alarm_heap_init(grpc_alarm_heap *heap) { +void grpc_timer_heap_init(grpc_timer_heap *heap) { memset(heap, 0, sizeof(*heap)); } -void grpc_alarm_heap_destroy(grpc_alarm_heap *heap) { gpr_free(heap->alarms); } +void grpc_timer_heap_destroy(grpc_timer_heap *heap) { gpr_free(heap->timers); } -int grpc_alarm_heap_add(grpc_alarm_heap *heap, grpc_alarm *alarm) { - if (heap->alarm_count == heap->alarm_capacity) { - heap->alarm_capacity = - GPR_MAX(heap->alarm_capacity + 1, heap->alarm_capacity * 3 / 2); - heap->alarms = - gpr_realloc(heap->alarms, heap->alarm_capacity * sizeof(grpc_alarm *)); +int grpc_timer_heap_add(grpc_timer_heap *heap, grpc_timer *timer) { + if (heap->timer_count == heap->timer_capacity) { + heap->timer_capacity = + GPR_MAX(heap->timer_capacity + 1, heap->timer_capacity * 3 / 2); + heap->timers = + gpr_realloc(heap->timers, heap->timer_capacity * sizeof(grpc_timer *)); } - alarm->heap_index = heap->alarm_count; - adjust_upwards(heap->alarms, heap->alarm_count, alarm); - heap->alarm_count++; - return alarm->heap_index == 0; + timer->heap_index = heap->timer_count; + adjust_upwards(heap->timers, heap->timer_count, timer); + heap->timer_count++; + return timer->heap_index == 0; } -void grpc_alarm_heap_remove(grpc_alarm_heap *heap, grpc_alarm *alarm) { - gpr_uint32 i = alarm->heap_index; - if (i == heap->alarm_count - 1) { - heap->alarm_count--; +void grpc_timer_heap_remove(grpc_timer_heap *heap, grpc_timer *timer) { + gpr_uint32 i = timer->heap_index; + if (i == heap->timer_count - 1) { + heap->timer_count--; maybe_shrink(heap); return; } - heap->alarms[i] = heap->alarms[heap->alarm_count - 1]; - heap->alarms[i]->heap_index = i; - heap->alarm_count--; + heap->timers[i] = heap->timers[heap->timer_count - 1]; + heap->timers[i]->heap_index = i; + heap->timer_count--; maybe_shrink(heap); - note_changed_priority(heap, heap->alarms[i]); + note_changed_priority(heap, heap->timers[i]); } -int grpc_alarm_heap_is_empty(grpc_alarm_heap *heap) { - return heap->alarm_count == 0; +int grpc_timer_heap_is_empty(grpc_timer_heap *heap) { + return heap->timer_count == 0; } -grpc_alarm *grpc_alarm_heap_top(grpc_alarm_heap *heap) { - return heap->alarms[0]; +grpc_timer *grpc_timer_heap_top(grpc_timer_heap *heap) { + return heap->timers[0]; } -void grpc_alarm_heap_pop(grpc_alarm_heap *heap) { - grpc_alarm_heap_remove(heap, grpc_alarm_heap_top(heap)); +void grpc_timer_heap_pop(grpc_timer_heap *heap) { + grpc_timer_heap_remove(heap, grpc_timer_heap_top(heap)); } diff --git a/src/core/iomgr/alarm_heap.h b/src/core/iomgr/timer_heap.h index 91d6ee3ca2..cd5258f93e 100644 --- a/src/core/iomgr/alarm_heap.h +++ b/src/core/iomgr/timer_heap.h @@ -31,27 +31,27 @@ * */ -#ifndef GRPC_INTERNAL_CORE_IOMGR_ALARM_HEAP_H -#define GRPC_INTERNAL_CORE_IOMGR_ALARM_HEAP_H +#ifndef GRPC_INTERNAL_CORE_IOMGR_TIMER_HEAP_H +#define GRPC_INTERNAL_CORE_IOMGR_TIMER_HEAP_H -#include "src/core/iomgr/alarm.h" +#include "src/core/iomgr/timer.h" typedef struct { - grpc_alarm **alarms; - gpr_uint32 alarm_count; - gpr_uint32 alarm_capacity; -} grpc_alarm_heap; + grpc_timer **timers; + gpr_uint32 timer_count; + gpr_uint32 timer_capacity; +} grpc_timer_heap; -/* return 1 if the new alarm is the first alarm in the heap */ -int grpc_alarm_heap_add(grpc_alarm_heap *heap, grpc_alarm *alarm); +/* return 1 if the new timer is the first timer in the heap */ +int grpc_timer_heap_add(grpc_timer_heap *heap, grpc_timer *timer); -void grpc_alarm_heap_init(grpc_alarm_heap *heap); -void grpc_alarm_heap_destroy(grpc_alarm_heap *heap); +void grpc_timer_heap_init(grpc_timer_heap *heap); +void grpc_timer_heap_destroy(grpc_timer_heap *heap); -void grpc_alarm_heap_remove(grpc_alarm_heap *heap, grpc_alarm *alarm); -grpc_alarm *grpc_alarm_heap_top(grpc_alarm_heap *heap); -void grpc_alarm_heap_pop(grpc_alarm_heap *heap); +void grpc_timer_heap_remove(grpc_timer_heap *heap, grpc_timer *timer); +grpc_timer *grpc_timer_heap_top(grpc_timer_heap *heap); +void grpc_timer_heap_pop(grpc_timer_heap *heap); -int grpc_alarm_heap_is_empty(grpc_alarm_heap *heap); +int grpc_timer_heap_is_empty(grpc_timer_heap *heap); -#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_HEAP_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_TIMER_HEAP_H */ diff --git a/src/core/iomgr/alarm_internal.h b/src/core/iomgr/timer_internal.h index 31d840e6f9..f180eca36e 100644 --- a/src/core/iomgr/alarm_internal.h +++ b/src/core/iomgr/timer_internal.h @@ -31,33 +31,33 @@ * */ -#ifndef GRPC_INTERNAL_CORE_IOMGR_ALARM_INTERNAL_H -#define GRPC_INTERNAL_CORE_IOMGR_ALARM_INTERNAL_H +#ifndef GRPC_INTERNAL_CORE_IOMGR_TIMER_INTERNAL_H +#define GRPC_INTERNAL_CORE_IOMGR_TIMER_INTERNAL_H #include "src/core/iomgr/exec_ctx.h" #include <grpc/support/sync.h> #include <grpc/support/time.h> -/* iomgr internal api for dealing with alarms */ +/* iomgr internal api for dealing with timers */ -/* Check for alarms to be run, and run them. - Return non zero if alarm callbacks were executed. +/* Check for timers to be run, and run them. + Return non zero if timer callbacks were executed. Drops drop_mu if it is non-null before executing callbacks. - If next is non-null, TRY to update *next with the next running alarm - IF that alarm occurs before *next current value. + If next is non-null, TRY to update *next with the next running timer + IF that timer occurs before *next current value. *next is never guaranteed to be updated on any given execution; however, with high probability at least one thread in the system will see an update at any time slice. */ -int grpc_alarm_check(grpc_exec_ctx* exec_ctx, gpr_timespec now, +int grpc_timer_check(grpc_exec_ctx* exec_ctx, gpr_timespec now, gpr_timespec* next); -void grpc_alarm_list_init(gpr_timespec now); -void grpc_alarm_list_shutdown(grpc_exec_ctx* exec_ctx); +void grpc_timer_list_init(gpr_timespec now); +void grpc_timer_list_shutdown(grpc_exec_ctx* exec_ctx); -gpr_timespec grpc_alarm_list_next_timeout(void); +gpr_timespec grpc_timer_list_next_timeout(void); /* the following must be implemented by each iomgr implementation */ void grpc_kick_poller(void); -#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_INTERNAL_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_TIMER_INTERNAL_H */ diff --git a/src/core/iomgr/wakeup_fd_eventfd.c b/src/core/iomgr/wakeup_fd_eventfd.c index 48eb1afb3d..f67379e4fc 100644 --- a/src/core/iomgr/wakeup_fd_eventfd.c +++ b/src/core/iomgr/wakeup_fd_eventfd.c @@ -39,9 +39,11 @@ #include <sys/eventfd.h> #include <unistd.h> -#include "src/core/iomgr/wakeup_fd_posix.h" #include <grpc/support/log.h> +#include "src/core/iomgr/wakeup_fd_posix.h" +#include "src/core/profiling/timers.h" + static void eventfd_create(grpc_wakeup_fd* fd_info) { int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); /* TODO(klempner): Handle failure more gracefully */ @@ -60,9 +62,11 @@ static void eventfd_consume(grpc_wakeup_fd* fd_info) { static void eventfd_wakeup(grpc_wakeup_fd* fd_info) { int err; + GPR_TIMER_BEGIN("eventfd_wakeup", 0); do { err = eventfd_write(fd_info->read_fd, 1); } while (err < 0 && errno == EINTR); + GPR_TIMER_END("eventfd_wakeup", 0); } static void eventfd_destroy(grpc_wakeup_fd* fd_info) { diff --git a/src/core/profiling/basic_timers.c b/src/core/profiling/basic_timers.c index 2f6c88daac..b49cdd07b3 100644 --- a/src/core/profiling/basic_timers.c +++ b/src/core/profiling/basic_timers.c @@ -44,98 +44,91 @@ #include <grpc/support/thd.h> #include <stdio.h> -typedef enum { - BEGIN = '{', - END = '}', - MARK = '.', - IMPORTANT = '!' -} marker_type; - -typedef struct grpc_timer_entry { +typedef enum { BEGIN = '{', END = '}', MARK = '.' } marker_type; + +typedef struct gpr_timer_entry { gpr_timespec tm; - int tag; const char *tagstr; - marker_type type; - void *id; const char *file; int line; -} grpc_timer_entry; + char type; + gpr_uint8 important; +} gpr_timer_entry; + +#define MAX_COUNT (1024 * 1024 / sizeof(gpr_timer_entry)) -#define MAX_COUNT (1024 * 1024 / sizeof(grpc_timer_entry)) +static __thread gpr_timer_entry g_log[MAX_COUNT]; +static __thread int g_count; +static gpr_once g_once_init = GPR_ONCE_INIT; +static FILE *output_file; -static __thread grpc_timer_entry log[MAX_COUNT]; -static __thread int count; +static void close_output() { fclose(output_file); } + +static void init_output() { + output_file = fopen("latency_trace.txt", "w"); + GPR_ASSERT(output_file); + atexit(close_output); +} static void log_report() { int i; - for (i = 0; i < count; i++) { - grpc_timer_entry *entry = &(log[i]); - printf("GRPC_LAT_PROF %ld.%09d %p %c %d(%s) %p %s %d\n", entry->tm.tv_sec, - entry->tm.tv_nsec, (void *)(gpr_intptr)gpr_thd_currentid(), - entry->type, entry->tag, entry->tagstr, entry->id, entry->file, - entry->line); + gpr_once_init(&g_once_init, init_output); + for (i = 0; i < g_count; i++) { + gpr_timer_entry *entry = &(g_log[i]); + fprintf(output_file, + "{\"t\": %ld.%09d, \"thd\": \"%p\", \"type\": \"%c\", \"tag\": " + "\"%s\", \"file\": \"%s\", \"line\": %d, \"imp\": %d}\n", + entry->tm.tv_sec, entry->tm.tv_nsec, + (void *)(gpr_intptr)gpr_thd_currentid(), entry->type, entry->tagstr, + entry->file, entry->line, entry->important); } /* Now clear out the log */ - count = 0; + g_count = 0; } -static void grpc_timers_log_add(int tag, const char *tagstr, marker_type type, - void *id, const char *file, int line) { - grpc_timer_entry *entry; +static void gpr_timers_log_add(const char *tagstr, marker_type type, + int important, const char *file, int line) { + gpr_timer_entry *entry; /* TODO (vpai) : Improve concurrency */ - if (count == MAX_COUNT) { + if (g_count == MAX_COUNT) { log_report(); } - entry = &log[count++]; + entry = &g_log[g_count++]; entry->tm = gpr_now(GPR_CLOCK_PRECISE); - entry->tag = tag; entry->tagstr = tagstr; entry->type = type; - entry->id = id; entry->file = file; entry->line = line; + entry->important = important != 0; } /* Latency profiler API implementation. */ -void grpc_timer_add_mark(int tag, const char *tagstr, void *id, - const char *file, int line) { - if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { - grpc_timers_log_add(tag, tagstr, MARK, id, file, line); - } +void gpr_timer_add_mark(const char *tagstr, int important, const char *file, + int line) { + gpr_timers_log_add(tagstr, MARK, important, file, line); } -void grpc_timer_add_important_mark(int tag, const char *tagstr, void *id, - const char *file, int line) { - if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { - grpc_timers_log_add(tag, tagstr, IMPORTANT, id, file, line); - } +void gpr_timer_begin(const char *tagstr, int important, const char *file, + int line) { + gpr_timers_log_add(tagstr, BEGIN, important, file, line); } -void grpc_timer_begin(int tag, const char *tagstr, void *id, const char *file, - int line) { - if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { - grpc_timers_log_add(tag, tagstr, BEGIN, id, file, line); - } -} - -void grpc_timer_end(int tag, const char *tagstr, void *id, const char *file, - int line) { - if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { - grpc_timers_log_add(tag, tagstr, END, id, file, line); - } +void gpr_timer_end(const char *tagstr, int important, const char *file, + int line) { + gpr_timers_log_add(tagstr, END, important, file, line); } /* Basic profiler specific API functions. */ -void grpc_timers_global_init(void) {} +void gpr_timers_global_init(void) {} -void grpc_timers_global_destroy(void) {} +void gpr_timers_global_destroy(void) {} #else /* !GRPC_BASIC_PROFILER */ -void grpc_timers_global_init(void) {} +void gpr_timers_global_init(void) {} -void grpc_timers_global_destroy(void) {} +void gpr_timers_global_destroy(void) {} #endif /* GRPC_BASIC_PROFILER */ diff --git a/src/core/profiling/stap_timers.c b/src/core/profiling/stap_timers.c index 6868a674a9..efcd1af4a1 100644 --- a/src/core/profiling/stap_timers.c +++ b/src/core/profiling/stap_timers.c @@ -42,23 +42,23 @@ #include "src/core/profiling/stap_probes.h" /* Latency profiler API implementation. */ -void grpc_timer_add_mark(int tag, const char *tagstr, void *id, - const char *file, int line) { +void gpr_timer_add_mark(int tag, const char *tagstr, void *id, const char *file, + int line) { _STAP_ADD_MARK(tag); } -void grpc_timer_add_important_mark(int tag, const char *tagstr, void *id, - const char *file, int line) { +void gpr_timer_add_important_mark(int tag, const char *tagstr, void *id, + const char *file, int line) { _STAP_ADD_IMPORTANT_MARK(tag); } -void grpc_timer_begin(int tag, const char *tagstr, void *id, const char *file, - int line) { +void gpr_timer_begin(int tag, const char *tagstr, void *id, const char *file, + int line) { _STAP_TIMING_NS_BEGIN(tag); } -void grpc_timer_end(int tag, const char *tagstr, void *id, const char *file, - int line) { +void gpr_timer_end(int tag, const char *tagstr, void *id, const char *file, + int line) { _STAP_TIMING_NS_END(tag); } diff --git a/src/core/profiling/timers.h b/src/core/profiling/timers.h index a70520408c..0d112e7248 100644 --- a/src/core/profiling/timers.h +++ b/src/core/profiling/timers.h @@ -38,65 +38,28 @@ extern "C" { #endif -void grpc_timers_global_init(void); -void grpc_timers_global_destroy(void); - -void grpc_timer_add_mark(int tag, const char *tagstr, void *id, - const char *file, int line); -void grpc_timer_add_important_mark(int tag, const char *tagstr, void *id, - const char *file, int line); -void grpc_timer_begin(int tag, const char *tagstr, void *id, const char *file, - int line); -void grpc_timer_end(int tag, const char *tagstr, void *id, const char *file, - int line); - -enum grpc_profiling_tags { - /* Any GRPC_PTAG_* >= than the threshold won't generate any profiling mark. */ - GRPC_PTAG_IGNORE_THRESHOLD = 1000000, - - /* Re. Protos. */ - GRPC_PTAG_PROTO_SERIALIZE = 100 + GRPC_PTAG_IGNORE_THRESHOLD, - GRPC_PTAG_PROTO_DESERIALIZE = 101 + GRPC_PTAG_IGNORE_THRESHOLD, - - /* Re. sockets. */ - GRPC_PTAG_HANDLE_READ = 200 + GRPC_PTAG_IGNORE_THRESHOLD, - GRPC_PTAG_SENDMSG = 201 + GRPC_PTAG_IGNORE_THRESHOLD, - GRPC_PTAG_RECVMSG = 202 + GRPC_PTAG_IGNORE_THRESHOLD, - GRPC_PTAG_POLL_FINISHED = 203 + GRPC_PTAG_IGNORE_THRESHOLD, - GRPC_PTAG_TCP_CB_WRITE = 204 + GRPC_PTAG_IGNORE_THRESHOLD, - GRPC_PTAG_TCP_WRITE = 205 + GRPC_PTAG_IGNORE_THRESHOLD, - GRPC_PTAG_CALL_ON_DONE_RECV = 206 + GRPC_PTAG_IGNORE_THRESHOLD, - - /* C++ */ - GRPC_PTAG_CPP_CALL_CREATED = 300 + GRPC_PTAG_IGNORE_THRESHOLD, - GRPC_PTAG_CPP_PERFORM_OPS = 301 + GRPC_PTAG_IGNORE_THRESHOLD, - - /* Transports */ - GRPC_PTAG_HTTP2_UNLOCK = 401 + GRPC_PTAG_IGNORE_THRESHOLD, - GRPC_PTAG_HTTP2_UNLOCK_CLEANUP = 402 + GRPC_PTAG_IGNORE_THRESHOLD, - - /* > 1024 Unassigned reserved. For any miscellaneous use. - * Use addition to generate tags from this base or take advantage of the 10 - * zero'd bits for OR-ing. */ - GRPC_PTAG_OTHER_BASE = 1024 -}; +void gpr_timers_global_init(void); +void gpr_timers_global_destroy(void); + +void gpr_timer_add_mark(const char *tagstr, int important, const char *file, + int line); +void gpr_timer_begin(const char *tagstr, int important, const char *file, + int line); +void gpr_timer_end(const char *tagstr, int important, const char *file, + int line); #if !(defined(GRPC_STAP_PROFILER) + defined(GRPC_BASIC_PROFILER)) /* No profiling. No-op all the things. */ -#define GRPC_TIMER_MARK(tag, id) \ - do { \ - } while (0) - -#define GRPC_TIMER_IMPORTANT_MARK(tag, id) \ - do { \ +#define GPR_TIMER_MARK(tag, important) \ + do { \ } while (0) -#define GRPC_TIMER_BEGIN(tag, id) \ - do { \ +#define GPR_TIMER_BEGIN(tag, important) \ + do { \ } while (0) -#define GRPC_TIMER_END(tag, id) \ - do { \ +#define GPR_TIMER_END(tag, important) \ + do { \ } while (0) #else /* at least one profiler requested... */ @@ -106,28 +69,14 @@ enum grpc_profiling_tags { #endif /* Generic profiling interface. */ -#define GRPC_TIMER_MARK(tag, id) \ - if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ - grpc_timer_add_mark(tag, #tag, ((void *)(gpr_intptr)(id)), __FILE__, \ - __LINE__); \ - } - -#define GRPC_TIMER_IMPORTANT_MARK(tag, id) \ - if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ - grpc_timer_add_important_mark(tag, #tag, ((void *)(gpr_intptr)(id)), \ - __FILE__, __LINE__); \ - } +#define GPR_TIMER_MARK(tag, important) \ + gpr_timer_add_mark(tag, important, __FILE__, __LINE__); -#define GRPC_TIMER_BEGIN(tag, id) \ - if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ - grpc_timer_begin(tag, #tag, ((void *)(gpr_intptr)(id)), __FILE__, \ - __LINE__); \ - } +#define GPR_TIMER_BEGIN(tag, important) \ + gpr_timer_begin(tag, important, __FILE__, __LINE__); -#define GRPC_TIMER_END(tag, id) \ - if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ - grpc_timer_end(tag, #tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \ - } +#define GPR_TIMER_END(tag, important) \ + gpr_timer_end(tag, important, __FILE__, __LINE__); #ifdef GRPC_STAP_PROFILER /* Empty placeholder for now. */ @@ -141,6 +90,28 @@ enum grpc_profiling_tags { #ifdef __cplusplus } + +#if (defined(GRPC_STAP_PROFILER) + defined(GRPC_BASIC_PROFILER)) +namespace grpc { +class ProfileScope { + public: + ProfileScope(const char *desc, bool important) : desc_(desc) { + GPR_TIMER_BEGIN(desc_, important ? 1 : 0); + } + ~ProfileScope() { GPR_TIMER_END(desc_, 0); } + + private: + const char *const desc_; +}; +} + +#define GPR_TIMER_SCOPE(tag, important) \ + ::grpc::ProfileScope _profile_scope_##__LINE__((tag), (important)) +#else +#define GPR_TIMER_SCOPE(tag, important) \ + do { \ + } while (false) +#endif #endif #endif /* GRPC_CORE_PROFILING_TIMERS_H */ diff --git a/src/core/support/alloc.c b/src/core/support/alloc.c index d2ed82e771..bfcb77956b 100644 --- a/src/core/support/alloc.c +++ b/src/core/support/alloc.c @@ -35,22 +35,32 @@ #include <stdlib.h> #include <grpc/support/port_platform.h> +#include "src/core/profiling/timers.h" void *gpr_malloc(size_t size) { - void *p = malloc(size); + void *p; + GPR_TIMER_BEGIN("gpr_malloc", 0); + p = malloc(size); if (!p) { abort(); } + GPR_TIMER_END("gpr_malloc", 0); return p; } -void gpr_free(void *p) { free(p); } +void gpr_free(void *p) { + GPR_TIMER_BEGIN("gpr_free", 0); + free(p); + GPR_TIMER_END("gpr_free", 0); +} void *gpr_realloc(void *p, size_t size) { + GPR_TIMER_BEGIN("gpr_realloc", 0); p = realloc(p, size); if (!p) { abort(); } + GPR_TIMER_END("gpr_realloc", 0); return p; } diff --git a/src/core/support/sync_posix.c b/src/core/support/sync_posix.c index 91c30989ce..39c96feb13 100644 --- a/src/core/support/sync_posix.c +++ b/src/core/support/sync_posix.c @@ -40,14 +40,23 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/profiling/timers.h" void gpr_mu_init(gpr_mu* mu) { GPR_ASSERT(pthread_mutex_init(mu, NULL) == 0); } void gpr_mu_destroy(gpr_mu* mu) { GPR_ASSERT(pthread_mutex_destroy(mu) == 0); } -void gpr_mu_lock(gpr_mu* mu) { GPR_ASSERT(pthread_mutex_lock(mu) == 0); } +void gpr_mu_lock(gpr_mu* mu) { + GPR_TIMER_BEGIN("gpr_mu_lock", 0); + GPR_ASSERT(pthread_mutex_lock(mu) == 0); + GPR_TIMER_END("gpr_mu_lock", 0); +} -void gpr_mu_unlock(gpr_mu* mu) { GPR_ASSERT(pthread_mutex_unlock(mu) == 0); } +void gpr_mu_unlock(gpr_mu* mu) { + GPR_TIMER_BEGIN("gpr_mu_unlock", 0); + GPR_ASSERT(pthread_mutex_unlock(mu) == 0); + GPR_TIMER_END("gpr_mu_unlock", 0); +} int gpr_mu_trylock(gpr_mu* mu) { int err = pthread_mutex_trylock(mu); diff --git a/src/core/support/time_posix.c b/src/core/support/time_posix.c index 78f2c2bb77..02cfca8555 100644 --- a/src/core/support/time_posix.c +++ b/src/core/support/time_posix.c @@ -63,7 +63,7 @@ static gpr_timespec gpr_from_timespec(struct timespec ts, /** maps gpr_clock_type --> clockid_t for clock_gettime */ static clockid_t clockid_for_gpr_clock[] = {CLOCK_MONOTONIC, CLOCK_REALTIME}; -void gpr_time_init(void) {} +void gpr_time_init(void) { gpr_precise_clock_init(); } gpr_timespec gpr_now(gpr_clock_type clock_type) { struct timespec now; @@ -89,6 +89,7 @@ static uint64_t g_time_start; void gpr_time_init(void) { mach_timebase_info_data_t tb = {0, 1}; + gpr_precise_clock_init(); mach_timebase_info(&tb); g_time_scale = tb.numer; g_time_scale /= tb.denom; diff --git a/src/core/support/time_precise.c b/src/core/support/time_precise.c new file mode 100644 index 0000000000..b37517e639 --- /dev/null +++ b/src/core/support/time_precise.c @@ -0,0 +1,89 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/log.h> +#include <grpc/support/time.h> +#include <stdio.h> + +#ifdef GRPC_TIMERS_RDTSC +#if defined(__i386__) +static void gpr_get_cycle_counter(long long int *clk) { + long long int ret; + __asm__ volatile("rdtsc" : "=A"(ret)); + *clk = ret; +} + +// ---------------------------------------------------------------- +#elif defined(__x86_64__) || defined(__amd64__) +static void gpr_get_cycle_counter(long long int *clk) { + unsigned long long low, high; + __asm__ volatile("rdtsc" : "=a"(low), "=d"(high)); + *clk = (long long)(high << 32) | (long long)low; +} +#endif + +static double cycles_per_second = 0; +static long long int start_cycle; +void gpr_precise_clock_init(void) { + time_t start; + long long end_cycle; + gpr_log(GPR_DEBUG, "Calibrating timers"); + start = time(NULL); + while (time(NULL) == start) + ; + gpr_get_cycle_counter(&start_cycle); + while (time(NULL) <= start + 10) + ; + gpr_get_cycle_counter(&end_cycle); + cycles_per_second = (double)(end_cycle - start_cycle) / 10.0; + gpr_log(GPR_DEBUG, "... cycles_per_second = %f\n", cycles_per_second); +} + +void gpr_precise_clock_now(gpr_timespec *clk) { + long long int counter; + double secs; + gpr_get_cycle_counter(&counter); + secs = (double)(counter - start_cycle) / cycles_per_second; + clk->clock_type = GPR_CLOCK_PRECISE; + clk->tv_sec = (time_t)secs; + clk->tv_nsec = (int)(1e9 * (secs - (double)clk->tv_sec)); +} + +#else /* GRPC_TIMERS_RDTSC */ +void gpr_precise_clock_init(void) {} + +void gpr_precise_clock_now(gpr_timespec *clk) { + *clk = gpr_now(GPR_CLOCK_REALTIME); + clk->clock_type = GPR_CLOCK_PRECISE; +} +#endif /* GRPC_TIMERS_RDTSC */ diff --git a/src/core/support/time_precise.h b/src/core/support/time_precise.h index cd201faab9..80c5000123 100644 --- a/src/core/support/time_precise.h +++ b/src/core/support/time_precise.h @@ -34,60 +34,9 @@ #ifndef GRPC_CORE_SUPPORT_TIME_PRECISE_H_ #define GRPC_CORE_SUPPORT_TIME_PRECISE_H_ -#include <grpc/support/sync.h> #include <grpc/support/time.h> -#include <stdio.h> -#ifdef GRPC_TIMERS_RDTSC -#if defined(__i386__) -static void gpr_get_cycle_counter(long long int *clk) { - long long int ret; - __asm__ volatile("rdtsc" : "=A"(ret)); - *clk = ret; -} - -// ---------------------------------------------------------------- -#elif defined(__x86_64__) || defined(__amd64__) -static void gpr_get_cycle_counter(long long int *clk) { - unsigned long long low, high; - __asm__ volatile("rdtsc" : "=a"(low), "=d"(high)); - *clk = (high << 32) | low; -} -#endif - -static gpr_once precise_clock_init = GPR_ONCE_INIT; -static long long cycles_per_second = 0; -static void gpr_precise_clock_init() { - time_t start = time(NULL); - gpr_precise_clock start_cycle; - gpr_precise_clock end_cycle; - while (time(NULL) == start) - ; - gpr_get_cycle_counter(&start_cycle); - while (time(NULL) == start + 1) - ; - gpr_get_cycle_counter(&end_cycle); - cycles_per_second = end_cycle - start_cycle; -} - -static double grpc_precise_clock_scaling_factor() { - gpr_once_init(&precise_clock_init, grpc_precise_clock_init); - return 1e6 / cycles_per_second; -} - -static void gpr_precise_clock_now(gpr_timespec *clk) { - long long int counter; - gpr_get_cycle_counter(&counter); - clk->clock = GPR_CLOCK_REALTIME; - clk->tv_sec = counter / cycles_per_second; - clk->tv_nsec = counter % cycles_per_second; -} - -#else /* GRPC_TIMERS_RDTSC */ -static void gpr_precise_clock_now(gpr_timespec *clk) { - *clk = gpr_now(GPR_CLOCK_REALTIME); - clk->clock_type = GPR_CLOCK_PRECISE; -} -#endif /* GRPC_TIMERS_RDTSC */ +void gpr_precise_clock_init(void); +void gpr_precise_clock_now(gpr_timespec *clk); #endif /* GRPC_CORE_SUPPORT_TIME_PRECISE_ */ diff --git a/src/core/surface/alarm.c b/src/core/surface/alarm.c new file mode 100644 index 0000000000..7c47dd56f8 --- /dev/null +++ b/src/core/surface/alarm.c @@ -0,0 +1,83 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/timer.h" +#include "src/core/surface/completion_queue.h" +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> + +struct grpc_alarm { + grpc_timer alarm; + grpc_cq_completion completion; + /** completion queue where events about this alarm will be posted */ + grpc_completion_queue *cq; + /** user supplied tag */ + void *tag; +}; + +static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg, + grpc_cq_completion *c) {} + +static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, int success) { + grpc_alarm *alarm = arg; + grpc_cq_end_op(exec_ctx, alarm->cq, alarm->tag, success, + do_nothing_end_completion, NULL, &alarm->completion); +} + +grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, + void *tag) { + grpc_alarm *alarm = gpr_malloc(sizeof(grpc_alarm)); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + GRPC_CQ_INTERNAL_REF(cq, "alarm"); + alarm->cq = cq; + alarm->tag = tag; + + grpc_timer_init(&exec_ctx, &alarm->alarm, deadline, alarm_cb, alarm, + gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_cq_begin_op(cq); + grpc_exec_ctx_finish(&exec_ctx); + return alarm; +} + +void grpc_alarm_cancel(grpc_alarm *alarm) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_timer_cancel(&exec_ctx, &alarm->alarm); + grpc_exec_ctx_finish(&exec_ctx); +} + +void grpc_alarm_destroy(grpc_alarm *alarm) { + grpc_alarm_cancel(alarm); + GRPC_CQ_INTERNAL_UNREF(alarm->cq, "alarm"); + gpr_free(alarm); +} diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 386a4a6b29..81ff215c0c 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -42,7 +42,7 @@ #include <grpc/support/useful.h> #include "src/core/channel/channel_stack.h" -#include "src/core/iomgr/alarm.h" +#include "src/core/iomgr/timer.h" #include "src/core/profiling/timers.h" #include "src/core/support/string.h" #include "src/core/surface/api_trace.h" @@ -237,7 +237,7 @@ struct grpc_call { grpc_call_context_element context[GRPC_CONTEXT_COUNT]; /* Deadline alarm - if have_alarm is non-zero */ - grpc_alarm alarm; + grpc_timer alarm; /* Call refcount - to keep the call alive during asynchronous operations */ gpr_refcount internal_refcount; @@ -306,8 +306,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, grpc_transport_stream_op *initial_op_ptr = NULL; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_call *call = - gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); + grpc_call *call; + GPR_TIMER_BEGIN("grpc_call_create", 0); + call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); memset(call, 0, sizeof(grpc_call)); gpr_mu_init(&call->mu); gpr_mu_init(&call->completion_mu); @@ -401,6 +402,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, set_deadline_alarm(&exec_ctx, call, send_deadline); } grpc_exec_ctx_finish(&exec_ctx); + GPR_TIMER_END("grpc_call_create", 0); return call; } @@ -461,6 +463,7 @@ void grpc_call_internal_ref(grpc_call *c) { static void destroy_call(grpc_exec_ctx *exec_ctx, grpc_call *call) { size_t i; grpc_call *c = call; + GPR_TIMER_BEGIN("destroy_call", 0); grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c)); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); gpr_mu_destroy(&c->mu); @@ -493,6 +496,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, grpc_call *call) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } gpr_free(c); + GPR_TIMER_END("destroy_call", 0); } #ifdef GRPC_CALL_REF_COUNT_DEBUG @@ -535,12 +539,24 @@ grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm( return algorithm; } -static void set_encodings_accepted_by_peer( - grpc_call *call, const gpr_slice accept_encoding_slice) { +static void destroy_encodings_accepted_by_peer(void *p) { return; } + +static void set_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem *mdel) { size_t i; grpc_compression_algorithm algorithm; gpr_slice_buffer accept_encoding_parts; + gpr_slice accept_encoding_slice; + void *accepted_user_data; + + accepted_user_data = + grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer); + if (accepted_user_data != NULL) { + call->encodings_accepted_by_peer = + (gpr_uint32)(((gpr_uintptr)accepted_user_data) - 1); + return; + } + accept_encoding_slice = mdel->value->slice; gpr_slice_buffer_init(&accept_encoding_parts); gpr_slice_split(accept_encoding_slice, ",", &accept_encoding_parts); @@ -564,6 +580,12 @@ static void set_encodings_accepted_by_peer( gpr_free(accept_encoding_entry_str); } } + + gpr_slice_buffer_destroy(&accept_encoding_parts); + + grpc_mdelem_set_user_data( + mdel, destroy_encodings_accepted_by_peer, + (void *)(((gpr_uintptr)call->encodings_accepted_by_peer) + 1)); } gpr_uint32 grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) { @@ -624,6 +646,8 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_call *call) { const size_t MAX_RECV_PEEK_AHEAD = 65536; size_t buffered_bytes; + GPR_TIMER_BEGIN("unlock", 0); + memset(&op, 0, sizeof(op)); op.cancel_with_status = call->cancel_with_status; @@ -694,6 +718,8 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_call *call) { unlock(exec_ctx, call); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completing"); } + + GPR_TIMER_END("unlock", 0); } static void get_final_status(grpc_call *call, grpc_ioreq_data out) { @@ -843,6 +869,7 @@ static void early_out_write_ops(grpc_call *call) { static void call_on_done_send(grpc_exec_ctx *exec_ctx, void *pc, int success) { grpc_call *call = pc; + GPR_TIMER_BEGIN("call_on_done_send", 0); lock(call); if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) { finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success); @@ -866,9 +893,11 @@ static void call_on_done_send(grpc_exec_ctx *exec_ctx, void *pc, int success) { call->sending = 0; unlock(exec_ctx, call); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "sending"); + GPR_TIMER_END("call_on_done_send", 0); } static void finish_message(grpc_call *call) { + GPR_TIMER_BEGIN("finish_message", 0); if (call->error_status_set == 0) { /* TODO(ctiller): this could be a lot faster if coded directly */ grpc_byte_buffer *byte_buffer; @@ -888,6 +917,7 @@ static void finish_message(grpc_call *call) { gpr_slice_buffer_reset_and_unref(&call->incoming_message); GPR_ASSERT(call->incoming_message.count == 0); call->reading_message = 0; + GPR_TIMER_END("finish_message", 0); } static int begin_message(grpc_call *call, grpc_begin_message msg) { @@ -977,7 +1007,7 @@ static void call_on_done_recv(grpc_exec_ctx *exec_ctx, void *pc, int success) { grpc_call *child_call; grpc_call *next_child_call; size_t i; - GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0); + GPR_TIMER_BEGIN("call_on_done_recv", 0); lock(call); call->receiving = 0; if (success) { @@ -987,13 +1017,19 @@ static void call_on_done_recv(grpc_exec_ctx *exec_ctx, void *pc, int success) { case GRPC_NO_OP: break; case GRPC_OP_METADATA: + GPR_TIMER_BEGIN("recv_metadata", 0); recv_metadata(exec_ctx, call, &op->data.metadata); + GPR_TIMER_END("recv_metadata", 0); break; case GRPC_OP_BEGIN_MESSAGE: + GPR_TIMER_BEGIN("begin_message", 0); success = begin_message(call, op->data.begin_message); + GPR_TIMER_END("begin_message", 0); break; case GRPC_OP_SLICE: + GPR_TIMER_BEGIN("add_slice_to_message", 0); success = add_slice_to_message(call, op->data.slice); + GPR_TIMER_END("add_slice_to_message", 0); break; } } @@ -1009,7 +1045,7 @@ static void call_on_done_recv(grpc_exec_ctx *exec_ctx, void *pc, int success) { GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); call->read_state = READ_STATE_STREAM_CLOSED; if (call->have_alarm) { - grpc_alarm_cancel(exec_ctx, &call->alarm); + grpc_timer_cancel(exec_ctx, &call->alarm); } /* propagate cancellation to any interested children */ child_call = call->first_child; @@ -1039,7 +1075,7 @@ static void call_on_done_recv(grpc_exec_ctx *exec_ctx, void *pc, int success) { unlock(exec_ctx, call); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "receiving"); - GRPC_TIMER_END(GRPC_PTAG_CALL_ON_DONE_RECV, 0); + GPR_TIMER_END("call_on_done_recv", 0); } static int prepare_application_metadata(grpc_call *call, size_t count, @@ -1327,7 +1363,7 @@ void grpc_call_destroy(grpc_call *c) { GPR_ASSERT(!c->destroy_called); c->destroy_called = 1; if (c->have_alarm) { - grpc_alarm_cancel(&exec_ctx, &c->alarm); + grpc_timer_cancel(&exec_ctx, &c->alarm); } cancel = c->read_state != READ_STATE_STREAM_CLOSED; unlock(&exec_ctx, c); @@ -1452,7 +1488,7 @@ static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call, GRPC_CALL_INTERNAL_REF(call, "alarm"); call->have_alarm = 1; call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - grpc_alarm_init(exec_ctx, &call->alarm, call->send_deadline, call_alarm, call, + grpc_timer_init(exec_ctx, &call->alarm, call->send_deadline, call_alarm, call, gpr_now(GPR_CLOCK_MONOTONIC)); } @@ -1510,23 +1546,31 @@ static void recv_metadata(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_metadata_array *dest; grpc_metadata *mdusr; int is_trailing; - grpc_mdctx *mdctx = call->metadata_context; is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA; for (l = md->list.head; l != NULL; l = l->next) { grpc_mdelem *mdel = l->md; grpc_mdstr *key = mdel->key; if (key == grpc_channel_get_status_string(call->channel)) { + GPR_TIMER_BEGIN("status", 0); set_status_code(call, STATUS_FROM_WIRE, decode_status(mdel)); + GPR_TIMER_END("status", 0); } else if (key == grpc_channel_get_message_string(call->channel)) { + GPR_TIMER_BEGIN("status-details", 0); set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(mdel->value)); + GPR_TIMER_END("status-details", 0); } else if (key == grpc_channel_get_compression_algorithm_string(call->channel)) { + GPR_TIMER_BEGIN("compression_algorithm", 0); set_compression_algorithm(call, decode_compression(mdel)); + GPR_TIMER_END("compression_algorithm", 0); } else if (key == grpc_channel_get_encodings_accepted_by_peer_string( call->channel)) { - set_encodings_accepted_by_peer(call, mdel->value->slice); + GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0); + set_encodings_accepted_by_peer(call, mdel); + GPR_TIMER_END("encodings_accepted_by_peer", 0); } else { + GPR_TIMER_BEGIN("report_up", 0); dest = &call->buffered_metadata[is_trailing]; if (dest->count == dest->capacity) { dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); @@ -1547,25 +1591,26 @@ static void recv_metadata(grpc_exec_ctx *exec_ctx, grpc_call *call, } call->owned_metadata[call->owned_metadata_count++] = mdel; l->md = NULL; + GPR_TIMER_END("report_up", 0); } } if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != 0 && !call->is_client) { + GPR_TIMER_BEGIN("set_deadline_alarm", 0); set_deadline_alarm(exec_ctx, call, md->deadline); + GPR_TIMER_END("set_deadline_alarm", 0); } if (!is_trailing) { call->read_state = READ_STATE_GOT_INITIAL_METADATA; } - grpc_mdctx_lock(mdctx); for (l = md->list.head; l; l = l->next) { - if (l->md) GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md); + if (l->md) GRPC_MDELEM_UNREF(l->md); } for (l = md->garbage.head; l; l = l->next) { - GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md); + GRPC_MDELEM_UNREF(l->md); } - grpc_mdctx_unlock(mdctx); } grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { @@ -1615,6 +1660,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, grpc_call_error error; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_TIMER_BEGIN("grpc_call_start_batch", 0); + GRPC_API_TRACE( "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)", 5, (call, ops, (unsigned long)nops, tag, reserved)); @@ -1852,6 +1899,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, finish_func, tag); done: grpc_exec_ctx_finish(&exec_ctx); + GPR_TIMER_END("grpc_call_start_batch", 0); return error; } diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index ca3c02c536..1a2aef64ef 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -37,7 +37,7 @@ #include <grpc/support/log.h> #include "src/core/channel/client_channel.h" -#include "src/core/iomgr/alarm.h" +#include "src/core/iomgr/timer.h" #include "src/core/surface/api_trace.h" #include "src/core/surface/completion_queue.h" @@ -78,7 +78,7 @@ typedef struct { int success; int removed; grpc_closure on_complete; - grpc_alarm alarm; + grpc_timer alarm; grpc_connectivity_state state; grpc_completion_queue *cq; grpc_cq_completion completion_storage; @@ -133,7 +133,7 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, gpr_mu_lock(&w->mu); w->success = 1; gpr_mu_unlock(&w->mu); - grpc_alarm_cancel(exec_ctx, &w->alarm); + grpc_timer_cancel(exec_ctx, &w->alarm); } gpr_mu_lock(&w->mu); @@ -195,7 +195,7 @@ void grpc_channel_watch_connectivity_state( w->tag = tag; w->channel = channel; - grpc_alarm_init(&exec_ctx, &w->alarm, + grpc_timer_init(&exec_ctx, &w->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC)); diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 9e552c2cdf..aa90b3f7f5 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -36,15 +36,18 @@ #include <stdio.h> #include <string.h> +#include "src/core/iomgr/timer.h" #include "src/core/iomgr/pollset.h" #include "src/core/support/string.h" #include "src/core/surface/api_trace.h" #include "src/core/surface/call.h" #include "src/core/surface/event_string.h" #include "src/core/surface/surface_trace.h" +#include "src/core/profiling/timers.h" #include <grpc/support/alloc.h> #include <grpc/support/atm.h> #include <grpc/support/log.h> +#include <grpc/support/time.h> typedef struct { grpc_pollset_worker *worker; @@ -71,6 +74,15 @@ struct grpc_completion_queue { grpc_closure pollset_destroy_done; }; +struct grpc_cq_alarm { + grpc_timer alarm; + grpc_cq_completion completion; + /** completion queue where events about this alarm will be posted */ + grpc_completion_queue *cq; + /** user supplied tag */ + void *tag; +}; + static void on_pollset_destroy_done(grpc_exec_ctx *exec_ctx, void *cc, int success); @@ -143,6 +155,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, int i; grpc_pollset_worker *pluck_worker; + GPR_TIMER_BEGIN("grpc_cq_end_op", 0); + storage->tag = tag; storage->done = done; storage->done_arg = done_arg; @@ -174,6 +188,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); grpc_pollset_shutdown(exec_ctx, &cc->pollset, &cc->pollset_destroy_done); } + + GPR_TIMER_END("grpc_cq_end_op", 0); } grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, @@ -184,6 +200,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec now; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); + GRPC_API_TRACE( "grpc_completion_queue_next(" "cc=%p, " @@ -230,6 +248,9 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); grpc_exec_ctx_finish(&exec_ctx); + + GPR_TIMER_END("grpc_completion_queue_next", 0); + return ret; } @@ -267,6 +288,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, int first_loop = 1; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); + GRPC_API_TRACE( "grpc_completion_queue_pluck(" "cc=%p, tag=%p, " @@ -332,6 +355,9 @@ done: GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "pluck"); grpc_exec_ctx_finish(&exec_ctx); + + GPR_TIMER_END("grpc_completion_queue_pluck", 0); + return ret; } diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 95011cab17..715c90a5e1 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -115,7 +115,7 @@ void grpc_init(void) { gpr_log(GPR_ERROR, "Could not initialize census."); } } - grpc_timers_global_init(); + gpr_timers_global_init(); for (i = 0; i < g_number_of_plugins; i++) { if (g_all_of_the_plugins[i].init != NULL) { g_all_of_the_plugins[i].init(); @@ -133,7 +133,7 @@ void grpc_shutdown(void) { if (--g_initializations == 0) { grpc_iomgr_shutdown(); census_shutdown(); - grpc_timers_global_destroy(); + gpr_timers_global_destroy(); grpc_tracer_shutdown(); grpc_resolver_registry_shutdown(); for (i = 0; i < g_number_of_plugins; i++) { diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index a0977ccaf6..5d4d8e70c4 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -35,6 +35,7 @@ #include <string.h> +#include "src/core/profiling/timers.h" #include "src/core/transport/chttp2/http2_errors.h" #include "src/core/transport/chttp2/status_conversion.h" #include "src/core/transport/chttp2/timeout_encoding.h" @@ -68,6 +69,8 @@ void grpc_chttp2_prepare_to_read( grpc_chttp2_stream_global *stream_global; grpc_chttp2_stream_parsing *stream_parsing; + GPR_TIMER_BEGIN("grpc_chttp2_prepare_to_read", 0); + transport_parsing->next_stream_id = transport_global->next_stream_id; /* update the parsing view of incoming window */ @@ -89,6 +92,8 @@ void grpc_chttp2_prepare_to_read( stream_parsing->incoming_window = stream_global->incoming_window; } } + + GPR_TIMER_END("grpc_chttp2_prepare_to_read", 0); } void grpc_chttp2_publish_reads( diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index 6c7f7a9ea7..24a5d958b8 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -584,7 +584,6 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, size_t max_take_size; gpr_uint32 curop = 0; gpr_uint32 unref_op; - grpc_mdctx *mdctx = compressor->mdctx; grpc_linked_mdelem *l; int need_unref = 0; gpr_timespec deadline; @@ -650,17 +649,15 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, finish_frame(&st, 1, eof); if (need_unref) { - grpc_mdctx_lock(mdctx); for (unref_op = 0; unref_op < curop; unref_op++) { op = &ops[unref_op]; if (op->type != GRPC_OP_METADATA) continue; for (l = op->data.metadata.list.head; l; l = l->next) { - if (l->md) GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md); + if (l->md) GRPC_MDELEM_UNREF(l->md); } for (l = op->data.metadata.garbage.head; l; l = l->next) { - GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md); + GRPC_MDELEM_UNREF(l->md); } } - grpc_mdctx_unlock(mdctx); } } diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index d1c9da6df0..69ad8854ba 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -37,6 +37,7 @@ #include <grpc/support/log.h> +#include "src/core/profiling/timers.h" #include "src/core/transport/chttp2/http2_errors.h" static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing); @@ -180,6 +181,8 @@ void grpc_chttp2_perform_writes( static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { grpc_chttp2_stream_writing *stream_writing; + GPR_TIMER_BEGIN("finalize_outbuf", 0); + while ( grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) { if (stream_writing->sopb.nops > 0 || @@ -208,6 +211,8 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { } grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); } + + GPR_TIMER_END("finalize_outbuf", 0); } void grpc_chttp2_cleanup_writing( diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index de74379546..effc3c4b3b 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -510,6 +510,7 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { + GPR_TIMER_BEGIN("unlock", 0); unlock_check_read_write_state(exec_ctx, t); if (!t->writing_active && !t->closed && grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) { @@ -520,6 +521,7 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { } gpr_mu_unlock(&t->mu); + GPR_TIMER_END("unlock", 0); } /* @@ -546,6 +548,8 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr; grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); + GPR_TIMER_BEGIN("grpc_chttp2_terminate_writing", 0); + lock(t); allow_endpoint_shutdown_locked(exec_ctx, t); @@ -567,12 +571,16 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, unlock(exec_ctx, t); UNREF_TRANSPORT(exec_ctx, t, "writing"); + + GPR_TIMER_END("grpc_chttp2_terminate_writing", 0); } static void writing_action(grpc_exec_ctx *exec_ctx, void *gt, int iomgr_success_ignored) { grpc_chttp2_transport *t = gt; + GPR_TIMER_BEGIN("writing_action", 0); grpc_chttp2_perform_writes(exec_ctx, &t->writing, t->ep); + GPR_TIMER_END("writing_action", 0); } void grpc_chttp2_add_incoming_goaway( @@ -642,6 +650,7 @@ static void maybe_start_some_streams( static void perform_stream_op_locked( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) { + GPR_TIMER_BEGIN("perform_stream_op_locked", 0); if (op->cancel_with_status != GRPC_STATUS_OK) { cancel_from_api(transport_global, stream_global, op->cancel_with_status); } @@ -713,6 +722,7 @@ static void perform_stream_op_locked( } grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); + GPR_TIMER_END("perform_stream_op_locked", 0); } static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, @@ -1103,6 +1113,8 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) { int keep_reading = 0; grpc_chttp2_transport *t = tp; + GPR_TIMER_BEGIN("recv_data", 0); + lock(t); i = 0; GPR_ASSERT(!t->parsing_active); @@ -1113,11 +1125,13 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) { &t->parsing_stream_map); grpc_chttp2_prepare_to_read(&t->global, &t->parsing); gpr_mu_unlock(&t->mu); + GPR_TIMER_BEGIN("recv_data.parse", 0); for (; i < t->read_buffer.count && grpc_chttp2_perform_read(exec_ctx, &t->parsing, t->read_buffer.slices[i]); i++) ; + GPR_TIMER_END("recv_data.parse", 0); gpr_mu_lock(&t->mu); if (i != t->read_buffer.count) { drop_connection(exec_ctx, t); @@ -1154,6 +1168,8 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) { } else { UNREF_TRANSPORT(exec_ctx, t, "recv_data"); } + + GPR_TIMER_END("recv_data", 0); } /* diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index 3dbb9f0b53..68f23177eb 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -62,6 +62,8 @@ #define REF_MD_LOCKED(s) ref_md_locked((s)) #endif +typedef void (*destroy_user_data_func)(void *user_data); + typedef struct internal_string { /* must be byte compatible with grpc_mdstr */ gpr_slice slice; @@ -88,8 +90,8 @@ typedef struct internal_metadata { /* private only data */ gpr_mu mu_user_data; - void *user_data; - void (*destroy_user_data)(void *user_data); + gpr_atm destroy_user_data; + gpr_atm user_data; grpc_mdctx *context; struct internal_metadata *bucket_next; @@ -158,8 +160,10 @@ static void ref_md_locked(internal_metadata *md DEBUG_ARGS) { grpc_mdstr_as_c_string((grpc_mdstr *)md->key), grpc_mdstr_as_c_string((grpc_mdstr *)md->value)); #endif - if (0 == gpr_atm_no_barrier_fetch_add(&md->refcnt, 1)) { + if (0 == gpr_atm_no_barrier_fetch_add(&md->refcnt, 2)) { md->context->mdtab_free--; + } else { + GPR_ASSERT(1 != gpr_atm_no_barrier_fetch_add(&md->refcnt, -1)); } } @@ -197,12 +201,14 @@ static void discard_metadata(grpc_mdctx *ctx) { for (i = 0; i < ctx->mdtab_capacity; i++) { cur = ctx->mdtab[i]; while (cur) { + void *user_data = (void *)gpr_atm_no_barrier_load(&cur->user_data); GPR_ASSERT(gpr_atm_acq_load(&cur->refcnt) == 0); next = cur->bucket_next; INTERNAL_STRING_UNREF(cur->key); INTERNAL_STRING_UNREF(cur->value); - if (cur->user_data) { - cur->destroy_user_data(cur->user_data); + if (user_data != NULL) { + ((destroy_user_data_func)gpr_atm_no_barrier_load( + &cur->destroy_user_data))(user_data); } gpr_mu_destroy(&cur->mu_user_data); gpr_free(cur); @@ -388,12 +394,14 @@ static void gc_mdtab(grpc_mdctx *ctx) { for (i = 0; i < ctx->mdtab_capacity; i++) { prev_next = &ctx->mdtab[i]; for (md = ctx->mdtab[i]; md; md = next) { + void *user_data = (void *)gpr_atm_no_barrier_load(&md->user_data); next = md->bucket_next; if (gpr_atm_acq_load(&md->refcnt) == 0) { INTERNAL_STRING_UNREF(md->key); INTERNAL_STRING_UNREF(md->value); if (md->user_data) { - md->destroy_user_data(md->user_data); + ((destroy_user_data_func)gpr_atm_no_barrier_load( + &md->destroy_user_data))(user_data); } gpr_free(md); *prev_next = next; @@ -465,12 +473,12 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdctx *ctx, /* not found: create a new pair */ md = gpr_malloc(sizeof(internal_metadata)); - gpr_atm_rel_store(&md->refcnt, 1); + gpr_atm_rel_store(&md->refcnt, 2); md->context = ctx; md->key = key; md->value = value; - md->user_data = NULL; - md->destroy_user_data = NULL; + md->user_data = 0; + md->destroy_user_data = 0; md->bucket_next = ctx->mdtab[hash % ctx->mdtab_capacity]; gpr_mu_init(&md->mu_user_data); #ifdef GRPC_METADATA_REFCOUNT_DEBUG @@ -527,15 +535,13 @@ grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *gmd DEBUG_ARGS) { this function - meaning that no adjustment to mdtab_free is necessary, simplifying the logic here to be just an atomic increment */ /* use C assert to have this removed in opt builds */ - assert(gpr_atm_no_barrier_load(&md->refcnt) >= 1); + assert(gpr_atm_no_barrier_load(&md->refcnt) >= 2); gpr_atm_no_barrier_fetch_add(&md->refcnt, 1); return gmd; } void grpc_mdelem_unref(grpc_mdelem *gmd DEBUG_ARGS) { internal_metadata *md = (internal_metadata *)gmd; - grpc_mdctx *ctx = md->context; - lock(ctx); #ifdef GRPC_METADATA_REFCOUNT_DEBUG gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "ELM UNREF:%p:%d->%d: '%s' = '%s'", md, @@ -544,11 +550,15 @@ void grpc_mdelem_unref(grpc_mdelem *gmd DEBUG_ARGS) { grpc_mdstr_as_c_string((grpc_mdstr *)md->key), grpc_mdstr_as_c_string((grpc_mdstr *)md->value)); #endif - assert(gpr_atm_no_barrier_load(&md->refcnt) >= 1); - if (1 == gpr_atm_full_fetch_add(&md->refcnt, -1)) { - ctx->mdtab_free++; + if (2 == gpr_atm_full_fetch_add(&md->refcnt, -1)) { + grpc_mdctx *ctx = md->context; + lock(ctx); + if (1 == gpr_atm_no_barrier_load(&md->refcnt)) { + ctx->mdtab_free++; + gpr_atm_no_barrier_store(&md->refcnt, 0); + } + unlock(ctx); } - unlock(ctx); } const char *grpc_mdstr_as_c_string(grpc_mdstr *s) { @@ -584,13 +594,14 @@ size_t grpc_mdctx_get_mdtab_free_test_only(grpc_mdctx *ctx) { return ctx->mdtab_free; } -void *grpc_mdelem_get_user_data(grpc_mdelem *md, - void (*if_destroy_func)(void *)) { +void *grpc_mdelem_get_user_data(grpc_mdelem *md, void (*destroy_func)(void *)) { internal_metadata *im = (internal_metadata *)md; void *result; - gpr_mu_lock(&im->mu_user_data); - result = im->destroy_user_data == if_destroy_func ? im->user_data : NULL; - gpr_mu_unlock(&im->mu_user_data); + if (gpr_atm_acq_load(&im->destroy_user_data) == (gpr_atm)destroy_func) { + return (void *)gpr_atm_no_barrier_load(&im->user_data); + } else { + return NULL; + } return result; } @@ -599,7 +610,7 @@ void grpc_mdelem_set_user_data(grpc_mdelem *md, void (*destroy_func)(void *), internal_metadata *im = (internal_metadata *)md; GPR_ASSERT((user_data == NULL) == (destroy_func == NULL)); gpr_mu_lock(&im->mu_user_data); - if (im->destroy_user_data) { + if (gpr_atm_no_barrier_load(&im->destroy_user_data)) { /* user data can only be set once */ gpr_mu_unlock(&im->mu_user_data); if (destroy_func != NULL) { @@ -607,8 +618,8 @@ void grpc_mdelem_set_user_data(grpc_mdelem *md, void (*destroy_func)(void *), } return; } - im->destroy_user_data = destroy_func; - im->user_data = user_data; + gpr_atm_no_barrier_store(&im->user_data, (gpr_atm)user_data); + gpr_atm_rel_store(&im->destroy_user_data, (gpr_atm)destroy_func); gpr_mu_unlock(&im->mu_user_data); } @@ -627,29 +638,6 @@ gpr_slice grpc_mdstr_as_base64_encoded_and_huffman_compressed(grpc_mdstr *gs) { return slice; } -void grpc_mdctx_lock(grpc_mdctx *ctx) { lock(ctx); } - -void grpc_mdctx_locked_mdelem_unref(grpc_mdctx *ctx, - grpc_mdelem *gmd DEBUG_ARGS) { - internal_metadata *md = (internal_metadata *)gmd; - grpc_mdctx *elem_ctx = md->context; - GPR_ASSERT(ctx == elem_ctx); -#ifdef GRPC_METADATA_REFCOUNT_DEBUG - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "ELM UNREF:%p:%d->%d: '%s' = '%s'", md, - gpr_atm_no_barrier_load(&md->refcnt), - gpr_atm_no_barrier_load(&md->refcnt) - 1, - grpc_mdstr_as_c_string((grpc_mdstr *)md->key), - grpc_mdstr_as_c_string((grpc_mdstr *)md->value)); -#endif - assert(gpr_atm_no_barrier_load(&md->refcnt) >= 1); - if (1 == gpr_atm_full_fetch_add(&md->refcnt, -1)) { - ctx->mdtab_free++; - } -} - -void grpc_mdctx_unlock(grpc_mdctx *ctx) { unlock(ctx); } - static int conforms_to(grpc_mdstr *s, const gpr_uint8 *legal_bits) { const gpr_uint8 *p = GPR_SLICE_START_PTR(s->slice); const gpr_uint8 *e = GPR_SLICE_END_PTR(s->slice); diff --git a/src/core/transport/metadata.h b/src/core/transport/metadata.h index 136a65f288..9a8164037c 100644 --- a/src/core/transport/metadata.h +++ b/src/core/transport/metadata.h @@ -155,28 +155,6 @@ int grpc_mdstr_is_legal_header(grpc_mdstr *s); int grpc_mdstr_is_legal_nonbin_header(grpc_mdstr *s); int grpc_mdstr_is_bin_suffixed(grpc_mdstr *s); -/* Batch mode metadata functions. - These API's have equivalents above, but allow taking the mdctx just once, - performing a bunch of work, and then leaving the mdctx. */ - -/* Lock the metadata context: it's only safe to call _locked_ functions against - this context from the calling thread until grpc_mdctx_unlock is called */ -void grpc_mdctx_lock(grpc_mdctx *ctx); -#ifdef GRPC_METADATA_REFCOUNT_DEBUG -#define GRPC_MDCTX_LOCKED_MDELEM_UNREF(ctx, elem) \ - grpc_mdctx_locked_mdelem_unref((ctx), (elem), __FILE__, __LINE__) -/* Unref a metadata element */ -void grpc_mdctx_locked_mdelem_unref(grpc_mdctx *ctx, grpc_mdelem *elem, - const char *file, int line); -#else -#define GRPC_MDCTX_LOCKED_MDELEM_UNREF(ctx, elem) \ - grpc_mdctx_locked_mdelem_unref((ctx), (elem)) -/* Unref a metadata element */ -void grpc_mdctx_locked_mdelem_unref(grpc_mdctx *ctx, grpc_mdelem *elem); -#endif -/* Unlock the metadata context */ -void grpc_mdctx_unlock(grpc_mdctx *ctx); - #define GRPC_MDSTR_KV_HASH(k_hash, v_hash) (GPR_ROTL((k_hash), 2) ^ (v_hash)) #endif /* GRPC_INTERNAL_CORE_TRANSPORT_METADATA_H */ diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index 1cb2bd7c59..6493e77bc5 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -38,6 +38,8 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include "src/core/profiling/timers.h" + /* Exponential growth function: Given x, return a larger x. Currently we grow by 1.5 times upon reallocation. */ #define GROW(x) (3 * (x) / 2) @@ -300,6 +302,8 @@ void grpc_metadata_batch_filter(grpc_metadata_batch *batch, grpc_linked_mdelem *l; grpc_linked_mdelem *next; + GPR_TIMER_BEGIN("grpc_metadata_batch_filter", 0); + assert_valid_list(&batch->list); assert_valid_list(&batch->garbage); for (l = batch->list.head; l; l = next) { @@ -328,4 +332,6 @@ void grpc_metadata_batch_filter(grpc_metadata_batch *batch, } assert_valid_list(&batch->list); assert_valid_list(&batch->garbage); + + GPR_TIMER_END("grpc_metadata_batch_filter", 0); } diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c index a40268a7f0..99e28ab63b 100644 --- a/src/core/tsi/fake_transport_security.c +++ b/src/core/tsi/fake_transport_security.c @@ -121,7 +121,7 @@ static void store32_little_endian(gpr_uint32 value, unsigned char *buf) { buf[3] = (unsigned char)((value >> 24) & 0xFF); buf[2] = (unsigned char)((value >> 16) & 0xFF); buf[1] = (unsigned char)((value >> 8) & 0xFF); - buf[0] = (unsigned char)((value) & 0xFF); + buf[0] = (unsigned char)((value)&0xFF); } static void tsi_fake_frame_reset(tsi_fake_frame *frame, int needs_draining) { diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c index 05789f07d4..22b57964cc 100644 --- a/src/core/tsi/ssl_transport_security.c +++ b/src/core/tsi/ssl_transport_security.c @@ -319,8 +319,9 @@ static tsi_result peer_from_x509(X509 *cert, int include_certificate_type, /* TODO(jboeuf): Maybe add more properties. */ GENERAL_NAMES *subject_alt_names = X509_get_ext_d2i(cert, NID_subject_alt_name, 0, 0); - int subject_alt_name_count = - (subject_alt_names != NULL) ? sk_GENERAL_NAME_num(subject_alt_names) : 0; + int subject_alt_name_count = (subject_alt_names != NULL) + ? (int)sk_GENERAL_NAME_num(subject_alt_names) + : 0; size_t property_count; tsi_result result; GPR_ASSERT(subject_alt_name_count >= 0); @@ -358,7 +359,7 @@ static void log_ssl_error_stack(void) { unsigned long err; while ((err = ERR_get_error()) != 0) { char details[256]; - ERR_error_string_n(err, details, sizeof(details)); + ERR_error_string_n((uint32_t)err, details, sizeof(details)); gpr_log(GPR_ERROR, "%s", details); } } @@ -668,7 +669,7 @@ static tsi_result ssl_protector_protect(tsi_frame_protector *self, tsi_result result = TSI_OK; /* First see if we have some pending data in the SSL BIO. */ - int pending_in_ssl = BIO_pending(impl->from_ssl); + int pending_in_ssl = (int)BIO_pending(impl->from_ssl); if (pending_in_ssl > 0) { *unprotected_bytes_size = 0; GPR_ASSERT(*protected_output_frames_size <= INT_MAX); @@ -726,7 +727,7 @@ static tsi_result ssl_protector_protect_flush( impl->buffer_offset = 0; } - pending = BIO_pending(impl->from_ssl); + pending = (int)BIO_pending(impl->from_ssl); GPR_ASSERT(pending >= 0); *still_pending_size = (size_t)pending; if (*still_pending_size == 0) return TSI_OK; @@ -739,7 +740,7 @@ static tsi_result ssl_protector_protect_flush( return TSI_INTERNAL_ERROR; } *protected_output_frames_size = (size_t)read_from_ssl; - pending = BIO_pending(impl->from_ssl); + pending = (int)BIO_pending(impl->from_ssl); GPR_ASSERT(pending >= 0); *still_pending_size = (size_t)pending; return TSI_OK; diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index dc8e304664..c7974d655b 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -78,7 +78,6 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, context->raw_deadline(), nullptr); } grpc_census_call_set_context(c_call, context->census_context()); - GRPC_TIMER_MARK(GRPC_PTAG_CPP_CALL_CREATED, c_call); context->set_call(c_call, shared_from_this()); return Call(c_call, this, cq); } @@ -87,11 +86,9 @@ void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; - GRPC_TIMER_BEGIN(GRPC_PTAG_CPP_PERFORM_OPS, call->call()); ops->FillOps(cops, &nops); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), cops, nops, ops, nullptr)); - GRPC_TIMER_END(GRPC_PTAG_CPP_PERFORM_OPS, call->call()); } void* Channel::RegisterMethod(const char* method) { diff --git a/src/cpp/common/alarm.cc b/src/cpp/common/alarm.cc new file mode 100644 index 0000000000..bce0b174f8 --- /dev/null +++ b/src/cpp/common/alarm.cc @@ -0,0 +1,49 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/grpc.h> +#include <grpc++/alarm.h> + +namespace grpc { + +Alarm::Alarm(CompletionQueue* cq, gpr_timespec deadline, void* tag) + : alarm_(grpc_alarm_create(cq->cq(), deadline, tag)) {} + +Alarm::~Alarm() { + grpc_alarm_destroy(alarm_); +} + +void Alarm::Cancel() { + grpc_alarm_cancel(alarm_); +} + +} // namespace grpc diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc index 4131fbe5e5..b1330fde7f 100644 --- a/src/cpp/proto/proto_utils.cc +++ b/src/cpp/proto/proto_utils.cc @@ -42,6 +42,8 @@ #include <grpc/support/port_platform.h> #include <grpc++/support/config.h> +#include "src/core/profiling/timers.h" + const int kMaxBufferLength = 8192; class GrpcBufferWriter GRPC_FINAL @@ -158,6 +160,7 @@ namespace grpc { Status SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) { + GPR_TIMER_SCOPE("SerializeProto", 0); int byte_size = msg.ByteSize(); if (byte_size <= kMaxBufferLength) { gpr_slice slice = gpr_slice_malloc(byte_size); @@ -176,6 +179,7 @@ Status SerializeProto(const grpc::protobuf::Message& msg, Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, int max_message_size) { + GPR_TIMER_SCOPE("DeserializeProto", 0); if (!buffer) { return Status(StatusCode::INTERNAL, "No payload"); } diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index e09744b842..f5063a079e 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -540,6 +540,7 @@ void Server::ScheduleCallback() { void Server::RunRpc() { // Wait for one more incoming rpc. bool ok; + GPR_TIMER_SCOPE("Server::RunRpc", 0); auto* mrd = SyncRequest::Wait(&cq_, &ok); if (mrd) { ScheduleCallback(); @@ -555,6 +556,7 @@ void Server::RunRpc() { mrd->TeardownRequest(); } } + GPR_TIMER_SCOPE("cd.Run()", 0); cd.Run(); } } diff --git a/src/csharp/Grpc.Auth/AuthInterceptors.cs b/src/csharp/Grpc.Auth/GoogleAuthInterceptors.cs index fa92566775..1c14c5bb5b 100644 --- a/src/csharp/Grpc.Auth/AuthInterceptors.cs +++ b/src/csharp/Grpc.Auth/GoogleAuthInterceptors.cs @@ -41,10 +41,10 @@ using Grpc.Core.Utils; namespace Grpc.Auth { /// <summary> - /// Factory methods to create authorization interceptors. - /// <seealso cref="GrpcCredentials"/> + /// Factory methods to create authorization interceptors for Google credentials. + /// <seealso cref="GoogleGrpcCredentials"/> /// </summary> - public static class AuthInterceptors + public static class GoogleAuthInterceptors { private const string AuthorizationHeader = "Authorization"; private const string Schema = "Bearer"; diff --git a/src/csharp/Grpc.Auth/GrpcCredentials.cs b/src/csharp/Grpc.Auth/GoogleGrpcCredentials.cs index d8b10804c6..a1e7db13bd 100644 --- a/src/csharp/Grpc.Auth/GrpcCredentials.cs +++ b/src/csharp/Grpc.Auth/GoogleGrpcCredentials.cs @@ -33,6 +33,7 @@ using System; using System.Threading; +using System.Threading.Tasks; using Google.Apis.Auth.OAuth2; using Grpc.Core; @@ -41,53 +42,55 @@ using Grpc.Core.Utils; namespace Grpc.Auth { /// <summary> - /// Factory methods to create instances of <see cref="ChannelCredentials"/> and <see cref="CallCredentials"/> classes. + /// Factory/extension methods to create instances of <see cref="ChannelCredentials"/> and <see cref="CallCredentials"/> classes + /// based on credential objects originating from Google auth library. /// </summary> - public static class GrpcCredentials + public static class GoogleGrpcCredentials { /// <summary> - /// Creates a <see cref="MetadataCredentials"/> instance that will obtain access tokens - /// from any credential that implements <c>ITokenAccess</c>. (e.g. <c>GoogleCredential</c>). + /// Retrieves an instance of Google's Application Default Credentials using + /// <c>GoogleCredential.GetApplicationDefaultAsync()</c> and converts them + /// into a gRPC <see cref="ChannelCredentials"/> that use the default SSL credentials. /// </summary> - /// <param name="credential">The credential to use to obtain access tokens.</param> - /// <returns>The <c>MetadataCredentials</c> instance.</returns> - public static MetadataCredentials Create(ITokenAccess credential) + /// <returns>The <c>ChannelCredentials</c> instance.</returns> + public static async Task<ChannelCredentials> GetApplicationDefaultAsync() { - return new MetadataCredentials(AuthInterceptors.FromCredential(credential)); + var googleCredential = await GoogleCredential.GetApplicationDefaultAsync().ConfigureAwait(false); + return googleCredential.ToChannelCredentials(); } /// <summary> - /// Convenience method to create a <see cref="ChannelCredentials"/> instance from - /// <c>ITokenAccess</c> credential and <c>SslCredentials</c> instance. + /// Creates an instance of <see cref="CallCredentials"/> that will use given access token to authenticate + /// with a gRPC service. /// </summary> - /// <param name="credential">The credential to use to obtain access tokens.</param> - /// <param name="sslCredentials">The <c>SslCredentials</c> instance.</param> - /// <returns>The channel credentials for access token based auth over a secure channel.</returns> - public static ChannelCredentials Create(ITokenAccess credential, SslCredentials sslCredentials) + /// <param name="accessToken">OAuth2 access token.</param> + /// /// <returns>The <c>MetadataCredentials</c> instance.</returns> + public static CallCredentials FromAccessToken(string accessToken) { - return ChannelCredentials.Create(sslCredentials, Create(credential)); + return CallCredentials.FromInterceptor(GoogleAuthInterceptors.FromAccessToken(accessToken)); } /// <summary> - /// Creates an instance of <see cref="MetadataCredentials"/> that will use given access token to authenticate - /// with a gRPC service. + /// Converts a <c>ITokenAccess</c> (e.g. <c>GoogleCredential</c>) object + /// into a gRPC <see cref="CallCredentials"/> object. /// </summary> - /// <param name="accessToken">OAuth2 access token.</param> - /// /// <returns>The <c>MetadataCredentials</c> instance.</returns> - public static MetadataCredentials FromAccessToken(string accessToken) + /// <param name="credential">The credential to use to obtain access tokens.</param> + /// <returns>The <c>CallCredentials</c> instance.</returns> + public static CallCredentials ToCallCredentials(this ITokenAccess credential) { - return new MetadataCredentials(AuthInterceptors.FromAccessToken(accessToken)); + return CallCredentials.FromInterceptor(GoogleAuthInterceptors.FromCredential(credential)); } /// <summary> - /// Converts a <c>ITokenAccess</c> object into a <see cref="MetadataCredentials"/> object supported - /// by gRPC. + /// Converts a <c>ITokenAccess</c> (e.g. <c>GoogleCredential</c>) object + /// into a gRPC <see cref="ChannelCredentials"/> object. + /// Default SSL credentials are used. /// </summary> - /// <param name="credential"></param> - /// <returns></returns> - public static MetadataCredentials ToGrpcCredentials(this ITokenAccess credential) + /// <param name="googleCredential">The credential to use to obtain access tokens.</param> + /// <returns>>The <c>ChannelCredentials</c> instance.</returns> + public static ChannelCredentials ToChannelCredentials(this ITokenAccess googleCredential) { - return GrpcCredentials.Create(credential); + return ChannelCredentials.Create(new SslCredentials(), googleCredential.ToCallCredentials()); } } } diff --git a/src/csharp/Grpc.Auth/Grpc.Auth.csproj b/src/csharp/Grpc.Auth/Grpc.Auth.csproj index 80ab07d2ae..55bde6e194 100644 --- a/src/csharp/Grpc.Auth/Grpc.Auth.csproj +++ b/src/csharp/Grpc.Auth/Grpc.Auth.csproj @@ -78,9 +78,9 @@ <Compile Include="..\Grpc.Core\Version.cs"> <Link>Version.cs</Link> </Compile> - <Compile Include="GrpcCredentials.cs" /> + <Compile Include="GoogleGrpcCredentials.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> - <Compile Include="AuthInterceptors.cs" /> + <Compile Include="GoogleAuthInterceptors.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> diff --git a/src/csharp/Grpc.Core.Tests/CallCredentialsTest.cs b/src/csharp/Grpc.Core.Tests/CallCredentialsTest.cs index 451963229a..82f881969e 100644 --- a/src/csharp/Grpc.Core.Tests/CallCredentialsTest.cs +++ b/src/csharp/Grpc.Core.Tests/CallCredentialsTest.cs @@ -55,8 +55,8 @@ namespace Grpc.Core.Tests public void CallCredentials_ToNativeCredentials() { var composite = CallCredentials.Compose( - new MetadataCredentials(async (uri, m) => { await Task.Delay(1); }), - new MetadataCredentials(async (uri, m) => { await Task.Delay(2); })); + CallCredentials.FromInterceptor(async (uri, m) => { await Task.Delay(1); }), + CallCredentials.FromInterceptor(async (uri, m) => { await Task.Delay(2); })); using (var nativeComposite = composite.ToNativeCredentials()) { } diff --git a/src/csharp/Grpc.Core.Tests/ChannelCredentialsTest.cs b/src/csharp/Grpc.Core.Tests/ChannelCredentialsTest.cs index 489bf38575..d5315ed39b 100644 --- a/src/csharp/Grpc.Core.Tests/ChannelCredentialsTest.cs +++ b/src/csharp/Grpc.Core.Tests/ChannelCredentialsTest.cs @@ -63,11 +63,5 @@ namespace Grpc.Core.Tests // forbid composing non-composable Assert.Throws(typeof(ArgumentException), () => ChannelCredentials.Create(new FakeChannelCredentials(false), new FakeCallCredentials())); } - - [Test] - public void ChannelCredentials_CreateWrapped() - { - ChannelCredentials.Create(new FakeCallCredentials()); - } } } diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index 68279a2007..e58528ff50 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -210,7 +210,7 @@ namespace Grpc.Core.Tests }); var callDetails = helper.CreateUnaryCall(); - BenchmarkUtil.RunBenchmark(100, 100, + BenchmarkUtil.RunBenchmark(1, 10, () => { Calls.BlockingUnaryCall(callDetails, "ABC"); }); } diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs index 714c2f7494..073c502daf 100644 --- a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs +++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs @@ -60,7 +60,7 @@ namespace Grpc.Core.Tests public void CompletionQueueCreateDestroyBenchmark() { BenchmarkUtil.RunBenchmark( - 100000, 1000000, + 10, 10, () => { CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create(); diff --git a/src/csharp/Grpc.Core/CallCredentials.cs b/src/csharp/Grpc.Core/CallCredentials.cs index 809c9f412d..400a9825de 100644 --- a/src/csharp/Grpc.Core/CallCredentials.cs +++ b/src/csharp/Grpc.Core/CallCredentials.cs @@ -41,6 +41,14 @@ using Grpc.Core.Utils; namespace Grpc.Core { /// <summary> + /// Asynchronous authentication interceptor for <see cref="CallCredentials"/>. + /// </summary> + /// <param name="authUri">URL of a service to which current remote call needs to authenticate</param> + /// <param name="metadata">Metadata to populate with entries that will be added to outgoing call's headers.</param> + /// <returns></returns> + public delegate Task AsyncAuthInterceptor(string authUri, Metadata metadata); + + /// <summary> /// Client-side call credentials. Provide authorization with per-call granularity. /// </summary> public abstract class CallCredentials @@ -57,6 +65,16 @@ namespace Grpc.Core } /// <summary> + /// Creates a new instance of <c>CallCredentials</c> class from an + /// interceptor that can attach metadata to outgoing calls. + /// </summary> + /// <param name="interceptor">authentication interceptor</param> + public static CallCredentials FromInterceptor(AsyncAuthInterceptor interceptor) + { + return new MetadataCredentials(interceptor); + } + + /// <summary> /// Creates native object for the credentials. /// </summary> /// <returns>The native credentials.</returns> @@ -64,18 +82,10 @@ namespace Grpc.Core } /// <summary> - /// Asynchronous authentication interceptor for <see cref="MetadataCredentials"/>. - /// </summary> - /// <param name="authUri">URL of a service to which current remote call needs to authenticate</param> - /// <param name="metadata">Metadata to populate with entries that will be added to outgoing call's headers.</param> - /// <returns></returns> - public delegate Task AsyncAuthInterceptor(string authUri, Metadata metadata); - - /// <summary> /// Client-side credentials that delegate metadata based auth to an interceptor. /// The interceptor is automatically invoked for each remote call that uses <c>MetadataCredentials.</c> /// </summary> - public class MetadataCredentials : CallCredentials + internal sealed class MetadataCredentials : CallCredentials { readonly AsyncAuthInterceptor interceptor; @@ -85,7 +95,7 @@ namespace Grpc.Core /// <param name="interceptor">authentication interceptor</param> public MetadataCredentials(AsyncAuthInterceptor interceptor) { - this.interceptor = interceptor; + this.interceptor = Preconditions.CheckNotNull(interceptor); } internal override CredentialsSafeHandle ToNativeCredentials() diff --git a/src/csharp/Grpc.Core/ChannelCredentials.cs b/src/csharp/Grpc.Core/ChannelCredentials.cs index 599674e02b..9d2bcdabe8 100644 --- a/src/csharp/Grpc.Core/ChannelCredentials.cs +++ b/src/csharp/Grpc.Core/ChannelCredentials.cs @@ -72,17 +72,6 @@ namespace Grpc.Core } /// <summary> - /// Creates a new instance of <c>ChannelCredentials</c> by wrapping - /// an instance of <c>CallCredentials</c>. - /// </summary> - /// <param name="callCredentials">Call credentials.</param> - /// <returns>The <c>ChannelCredentials</c> wrapping given call credentials.</returns> - public static ChannelCredentials Create(CallCredentials callCredentials) - { - return new WrappedCallCredentials(callCredentials); - } - - /// <summary> /// Creates native object for the credentials. May return null if insecure channel /// should be created. /// </summary> @@ -213,26 +202,4 @@ namespace Grpc.Core } } } - - /// <summary> - /// Credentials wrapping <see cref="CallCredentials"/> as <see cref="ChannelCredentials"/>. - /// </summary> - internal sealed class WrappedCallCredentials : ChannelCredentials - { - readonly CallCredentials callCredentials; - - /// <summary> - /// Wraps instance of <c>CallCredentials</c> as <c>ChannelCredentials</c>. - /// </summary> - /// <param name="callCredentials">credentials to wrap</param> - public WrappedCallCredentials(CallCredentials callCredentials) - { - this.callCredentials = Preconditions.CheckNotNull(callCredentials); - } - - internal override CredentialsSafeHandle ToNativeCredentials() - { - return callCredentials.ToNativeCredentials(); - } - } } diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index cb50b44841..030a098cad 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -143,14 +143,14 @@ namespace Grpc.IntegrationTesting { var googleCredential = await GoogleCredential.GetApplicationDefaultAsync(); Assert.IsTrue(googleCredential.IsCreateScopedRequired); - credentials = ChannelCredentials.Create(credentials, googleCredential.ToGrpcCredentials()); + credentials = ChannelCredentials.Create(credentials, googleCredential.ToCallCredentials()); } if (options.TestCase == "compute_engine_creds") { var googleCredential = await GoogleCredential.GetApplicationDefaultAsync(); Assert.IsFalse(googleCredential.IsCreateScopedRequired); - credentials = ChannelCredentials.Create(credentials, googleCredential.ToGrpcCredentials()); + credentials = ChannelCredentials.Create(credentials, googleCredential.ToCallCredentials()); } return credentials; } @@ -392,7 +392,7 @@ namespace Grpc.IntegrationTesting ITokenAccess credential = (await GoogleCredential.GetApplicationDefaultAsync()).CreateScoped(new[] { oauthScope }); string oauth2Token = await credential.GetAccessTokenForRequestAsync(); - var credentials = GrpcCredentials.FromAccessToken(oauth2Token); + var credentials = GoogleGrpcCredentials.FromAccessToken(oauth2Token); var request = new SimpleRequest { FillUsername = true, @@ -412,7 +412,7 @@ namespace Grpc.IntegrationTesting Console.WriteLine("running per_rpc_creds"); ITokenAccess googleCredential = await GoogleCredential.GetApplicationDefaultAsync(); - var credentials = GrpcCredentials.Create(googleCredential); + var credentials = googleCredential.ToCallCredentials(); var request = new SimpleRequest { FillUsername = true, diff --git a/src/csharp/Grpc.IntegrationTesting/MetadataCredentialsTest.cs b/src/csharp/Grpc.IntegrationTesting/MetadataCredentialsTest.cs index 5325b2fa14..3d56678b99 100644 --- a/src/csharp/Grpc.IntegrationTesting/MetadataCredentialsTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/MetadataCredentialsTest.cs @@ -75,7 +75,7 @@ namespace Grpc.IntegrationTesting var clientCredentials = ChannelCredentials.Create( new SslCredentials(File.ReadAllText(TestCredentials.ClientCertAuthorityPath)), - new MetadataCredentials(asyncAuthInterceptor)); + CallCredentials.FromInterceptor(asyncAuthInterceptor)); channel = new Channel(Host, server.Ports.Single().BoundPort, clientCredentials, options); client = TestService.NewClient(channel); } diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index b63e294f9a..fe11905109 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -83,6 +83,18 @@ using v8::Value; Callback *Call::constructor; Persistent<FunctionTemplate> Call::fun_tpl; +/** + * Helper function for throwing errors with a grpc_call_error value. + * Modified from the answer by Gus Goose to + * http://stackoverflow.com/questions/31794200. + */ +Local<Value> nanErrorWithCode(const char *msg, grpc_call_error code) { + EscapableHandleScope scope; + Local<Object> err = Nan::Error(msg).As<Object>(); + Nan::Set(err, Nan::New("code").ToLocalChecked(), Nan::New<Uint32>(code)); + return scope.Escape(err); +} + bool EndsWith(const char *str, const char *substr) { return strcmp(str+strlen(str)-strlen(substr), substr) == 0; } @@ -712,7 +724,11 @@ NAN_METHOD(Call::CancelWithStatus) { Call *call = ObjectWrap::Unwrap<Call>(info.This()); grpc_status_code code = static_cast<grpc_status_code>( Nan::To<uint32_t>(info[0]).FromJust()); - Utf8String details(info[0]); + if (code == GRPC_STATUS_OK) { + return Nan::ThrowRangeError( + "cancelWithStatus cannot be called with OK status"); + } + Utf8String details(info[1]); grpc_call_cancel_with_status(call->wrapped_call, code, *details, NULL); } diff --git a/src/node/ext/call.h b/src/node/ext/call.h index dd6c38e4f8..1e3c3ba18d 100644 --- a/src/node/ext/call.h +++ b/src/node/ext/call.h @@ -53,18 +53,7 @@ using std::shared_ptr; typedef Nan::Persistent<v8::Value, Nan::CopyablePersistentTraits<v8::Value>> PersistentValue; -/** - * Helper function for throwing errors with a grpc_call_error value. - * Modified from the answer by Gus Goose to - * http://stackoverflow.com/questions/31794200. - */ -inline v8::Local<v8::Value> nanErrorWithCode(const char *msg, - grpc_call_error code) { - Nan::EscapableHandleScope scope; - v8::Local<v8::Object> err = Nan::Error(msg).As<v8::Object>(); - Nan::Set(err, Nan::New("code").ToLocalChecked(), Nan::New<v8::Uint32>(code)); - return scope.Escape(err); -} +v8::Local<v8::Value> nanErrorWithCode(const char *msg, grpc_call_error code); v8::Local<v8::Value> ParseMetadata(const grpc_metadata_array *metadata_array); diff --git a/src/node/ext/call_credentials.cc b/src/node/ext/call_credentials.cc index 839bb567e4..ff16a1f122 100644 --- a/src/node/ext/call_credentials.cc +++ b/src/node/ext/call_credentials.cc @@ -126,16 +126,9 @@ NAN_METHOD(CallCredentials::New) { info.GetReturnValue().Set(info.This()); return; } else { - const int argc = 1; - Local<Value> argv[argc] = {info[0]}; - MaybeLocal<Object> maybe_instance = constructor->GetFunction()->NewInstance( - argc, argv); - if (maybe_instance.IsEmpty()) { - // There's probably a pending exception - return; - } else { - info.GetReturnValue().Set(maybe_instance.ToLocalChecked()); - } + // This should never be called directly + return Nan::ThrowTypeError( + "CallCredentials can only be created with the provided functions"); } } diff --git a/src/node/ext/channel.cc b/src/node/ext/channel.cc index a328c01713..584a0cf8ab 100644 --- a/src/node/ext/channel.cc +++ b/src/node/ext/channel.cc @@ -71,6 +71,72 @@ using v8::Value; Callback *Channel::constructor; Persistent<FunctionTemplate> Channel::fun_tpl; +bool ParseChannelArgs(Local<Value> args_val, + grpc_channel_args **channel_args_ptr) { + if (args_val->IsUndefined() || args_val->IsNull()) { + *channel_args_ptr = NULL; + return true; + } + if (!args_val->IsObject()) { + *channel_args_ptr = NULL; + return false; + } + grpc_channel_args *channel_args = reinterpret_cast<grpc_channel_args*>( + malloc(sizeof(channel_args))); + *channel_args_ptr = channel_args; + Local<Object> args_hash = Nan::To<Object>(args_val).ToLocalChecked(); + Local<Array> keys = Nan::GetOwnPropertyNames(args_hash).ToLocalChecked(); + channel_args->num_args = keys->Length(); + channel_args->args = reinterpret_cast<grpc_arg *>( + calloc(channel_args->num_args, sizeof(grpc_arg))); + for (unsigned int i = 0; i < channel_args->num_args; i++) { + Local<Value> key = Nan::Get(keys, i).ToLocalChecked(); + Utf8String key_str(key); + if (*key_str == NULL) { + // Key string onversion failed + return false; + } + Local<Value> value = Nan::Get(args_hash, key).ToLocalChecked(); + if (value->IsInt32()) { + channel_args->args[i].type = GRPC_ARG_INTEGER; + channel_args->args[i].value.integer = Nan::To<int32_t>(value).FromJust(); + } else if (value->IsString()) { + Utf8String val_str(value); + channel_args->args[i].type = GRPC_ARG_STRING; + channel_args->args[i].value.string = reinterpret_cast<char*>( + calloc(val_str.length() + 1,sizeof(char))); + memcpy(channel_args->args[i].value.string, + *val_str, val_str.length() + 1); + } else { + // The value does not match either of the accepted types + return false; + } + channel_args->args[i].key = reinterpret_cast<char*>( + calloc(key_str.length() + 1, sizeof(char))); + memcpy(channel_args->args[i].key, *key_str, key_str.length() + 1); + } + return true; +} + +void DeallocateChannelArgs(grpc_channel_args *channel_args) { + if (channel_args == NULL) { + return; + } + for (size_t i = 0; i < channel_args->num_args; i++) { + if (channel_args->args[i].key == NULL) { + /* NULL key implies that this argument and all subsequent arguments failed + * to parse */ + break; + } + free(channel_args->args[i].key); + if (channel_args->args[i].type == GRPC_ARG_STRING) { + free(channel_args->args[i].value.string); + } + } + free(channel_args->args); + free(channel_args); +} + Channel::Channel(grpc_channel *channel) : wrapped_channel(channel) {} Channel::~Channel() { @@ -119,49 +185,11 @@ NAN_METHOD(Channel::New) { ChannelCredentials *creds_object = ObjectWrap::Unwrap<ChannelCredentials>( Nan::To<Object>(info[1]).ToLocalChecked()); creds = creds_object->GetWrappedCredentials(); - grpc_channel_args *channel_args_ptr; - if (info[2]->IsUndefined()) { - channel_args_ptr = NULL; - wrapped_channel = grpc_insecure_channel_create(*host, NULL, NULL); - } else if (info[2]->IsObject()) { - Local<Object> args_hash = Nan::To<Object>(info[2]).ToLocalChecked(); - Local<Array> keys(Nan::GetOwnPropertyNames(args_hash).ToLocalChecked()); - grpc_channel_args channel_args; - channel_args.num_args = keys->Length(); - channel_args.args = reinterpret_cast<grpc_arg *>( - calloc(channel_args.num_args, sizeof(grpc_arg))); - /* These are used to keep all strings until then end of the block, then - destroy them */ - std::vector<Nan::Utf8String *> key_strings(keys->Length()); - std::vector<Nan::Utf8String *> value_strings(keys->Length()); - for (unsigned int i = 0; i < channel_args.num_args; i++) { - MaybeLocal<String> maybe_key = Nan::To<String>( - Nan::Get(keys, i).ToLocalChecked()); - if (maybe_key.IsEmpty()) { - free(channel_args.args); - return Nan::ThrowTypeError("Arg keys must be strings"); - } - Local<String> current_key = maybe_key.ToLocalChecked(); - Local<Value> current_value = Nan::Get(args_hash, - current_key).ToLocalChecked(); - key_strings[i] = new Nan::Utf8String(current_key); - channel_args.args[i].key = **key_strings[i]; - if (current_value->IsInt32()) { - channel_args.args[i].type = GRPC_ARG_INTEGER; - channel_args.args[i].value.integer = Nan::To<int32_t>( - current_value).FromJust(); - } else if (current_value->IsString()) { - channel_args.args[i].type = GRPC_ARG_STRING; - value_strings[i] = new Nan::Utf8String(current_value); - channel_args.args[i].value.string = **value_strings[i]; - } else { - free(channel_args.args); - return Nan::ThrowTypeError("Arg values must be strings"); - } - } - channel_args_ptr = &channel_args; - } else { - return Nan::ThrowTypeError("Channel expects a string and an object"); + grpc_channel_args *channel_args_ptr = NULL; + if (!ParseChannelArgs(info[2], &channel_args_ptr)) { + DeallocateChannelArgs(channel_args_ptr); + return Nan::ThrowTypeError("Channel options must be an object with " + "string keys and integer or string values"); } if (creds == NULL) { wrapped_channel = grpc_insecure_channel_create(*host, channel_args_ptr, @@ -170,9 +198,7 @@ NAN_METHOD(Channel::New) { wrapped_channel = grpc_secure_channel_create(creds, *host, channel_args_ptr, NULL); } - if (channel_args_ptr != NULL) { - free(channel_args_ptr->args); - } + DeallocateChannelArgs(channel_args_ptr); Channel *channel = new Channel(wrapped_channel); channel->Wrap(info.This()); info.GetReturnValue().Set(info.This()); diff --git a/src/node/ext/channel.h b/src/node/ext/channel.h index 0062fd03f4..9ec28e15af 100644 --- a/src/node/ext/channel.h +++ b/src/node/ext/channel.h @@ -41,6 +41,11 @@ namespace grpc { namespace node { +bool ParseChannelArgs(v8::Local<v8::Value> args_val, + grpc_channel_args **channel_args_ptr); + +void DeallocateChannelArgs(grpc_channel_args *channel_args); + /* Wrapper class for grpc_channel structs */ class Channel : public Nan::ObjectWrap { public: diff --git a/src/node/ext/channel_credentials.cc b/src/node/ext/channel_credentials.cc index 3d47ff293d..7ca3b9816c 100644 --- a/src/node/ext/channel_credentials.cc +++ b/src/node/ext/channel_credentials.cc @@ -127,16 +127,9 @@ NAN_METHOD(ChannelCredentials::New) { info.GetReturnValue().Set(info.This()); return; } else { - const int argc = 1; - Local<Value> argv[argc] = {info[0]}; - MaybeLocal<Object> maybe_instance = constructor->GetFunction()->NewInstance( - argc, argv); - if (maybe_instance.IsEmpty()) { - // There's probably a pending exception - return; - } else { - info.GetReturnValue().Set(maybe_instance.ToLocalChecked()); - } + // This should never be called directly + return Nan::ThrowTypeError( + "ChannelCredentials can only be created with the provided functions"); } } diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index 87363fc446..b9e1fe9160 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -182,49 +182,14 @@ NAN_METHOD(Server::New) { } grpc_server *wrapped_server; grpc_completion_queue *queue = CompletionQueueAsyncWorker::GetQueue(); - if (info[0]->IsUndefined()) { - wrapped_server = grpc_server_create(NULL, NULL); - } else if (info[0]->IsObject()) { - Local<Object> args_hash = Nan::To<Object>(info[0]).ToLocalChecked(); - Local<Array> keys = Nan::GetOwnPropertyNames(args_hash).ToLocalChecked(); - grpc_channel_args channel_args; - channel_args.num_args = keys->Length(); - channel_args.args = reinterpret_cast<grpc_arg *>( - calloc(channel_args.num_args, sizeof(grpc_arg))); - /* These are used to keep all strings until then end of the block, then - destroy them */ - std::vector<Utf8String *> key_strings(keys->Length()); - std::vector<Utf8String *> value_strings(keys->Length()); - for (unsigned int i = 0; i < channel_args.num_args; i++) { - MaybeLocal<String> maybe_key = Nan::To<String>( - Nan::Get(keys, i).ToLocalChecked()); - if (maybe_key.IsEmpty()) { - free(channel_args.args); - return Nan::ThrowTypeError("Arg keys must be strings"); - } - Local<String> current_key = maybe_key.ToLocalChecked(); - Local<Value> current_value = Nan::Get(args_hash, - current_key).ToLocalChecked(); - key_strings[i] = new Utf8String(current_key); - channel_args.args[i].key = **key_strings[i]; - if (current_value->IsInt32()) { - channel_args.args[i].type = GRPC_ARG_INTEGER; - channel_args.args[i].value.integer = Nan::To<int32_t>( - current_value).FromJust(); - } else if (current_value->IsString()) { - channel_args.args[i].type = GRPC_ARG_STRING; - value_strings[i] = new Utf8String(current_value); - channel_args.args[i].value.string = **value_strings[i]; - } else { - free(channel_args.args); - return Nan::ThrowTypeError("Arg values must be strings"); - } - } - wrapped_server = grpc_server_create(&channel_args, NULL); - free(channel_args.args); - } else { - return Nan::ThrowTypeError("Server expects an object"); + grpc_channel_args *channel_args; + if (!ParseChannelArgs(info[0], &channel_args)) { + DeallocateChannelArgs(channel_args); + return Nan::ThrowTypeError("Server options must be an object with " + "string keys and integer or string values"); } + wrapped_server = grpc_server_create(channel_args, NULL); + DeallocateChannelArgs(channel_args); grpc_server_register_completion_queue(wrapped_server, queue, NULL); Server *server = new Server(wrapped_server); server->Wrap(info.This()); diff --git a/src/node/ext/server_credentials.cc b/src/node/ext/server_credentials.cc index 5e922bd877..e6c55e263c 100644 --- a/src/node/ext/server_credentials.cc +++ b/src/node/ext/server_credentials.cc @@ -117,7 +117,7 @@ NAN_METHOD(ServerCredentials::New) { if (info.IsConstructCall()) { if (!info[0]->IsExternal()) { return Nan::ThrowTypeError( - "ServerCredentials can only be created with the provide functions"); + "ServerCredentials can only be created with the provided functions"); } Local<External> ext = info[0].As<External>(); grpc_server_credentials *creds_value = @@ -126,16 +126,9 @@ NAN_METHOD(ServerCredentials::New) { credentials->Wrap(info.This()); info.GetReturnValue().Set(info.This()); } else { - const int argc = 1; - Local<Value> argv[argc] = {info[0]}; - MaybeLocal<Object> maybe_instance = constructor->GetFunction()->NewInstance( - argc, argv); - if (maybe_instance.IsEmpty()) { - // There's probably a pending exception - return; - } else { - info.GetReturnValue().Set(maybe_instance.ToLocalChecked()); - } + // This should never be called directly + return Nan::ThrowTypeError( + "ServerCredentials can only be created with the provided functions"); } } diff --git a/src/node/index.js b/src/node/index.js index 591d9dd915..0d1a7fd887 100644 --- a/src/node/index.js +++ b/src/node/index.js @@ -33,6 +33,14 @@ 'use strict'; +var path = require('path'); + +var SSL_ROOTS_PATH = path.resolve(__dirname, '..', '..', 'etc', 'roots.pem'); + +if (!process.env.GRPC_DEFAULT_SSL_ROOTS_FILE_PATH) { + process.env.GRPC_DEFAULT_SSL_ROOTS_FILE_PATH = SSL_ROOTS_PATH; +} + var _ = require('lodash'); var ProtoBuf = require('protobufjs'); diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index cb55083d1a..b5061895cf 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -35,7 +35,6 @@ var fs = require('fs'); var path = require('path'); -var _ = require('lodash'); var grpc = require('..'); var testProto = grpc.load({ root: __dirname + '/../../..', diff --git a/src/node/src/client.js b/src/node/src/client.js index 909376e766..596ea5ebb0 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -661,6 +661,7 @@ exports.waitForClientReady = function(client, deadline, callback) { var checkState = function(err) { if (err) { callback(new Error('Failed to connect before the deadline')); + return; } var new_state = client.$channel.getConnectivityState(true); if (new_state === grpc.connectivityState.READY) { diff --git a/src/node/src/common.js b/src/node/src/common.js index 5551ebeec8..ebaaa13db0 100644 --- a/src/node/src/common.js +++ b/src/node/src/common.js @@ -87,14 +87,9 @@ exports.fullyQualifiedName = function fullyQualifiedName(value) { return ''; } var name = value.name; - if (value.className === 'Service.RPCMethod') { - name = _.capitalize(name); - } - if (value.hasOwnProperty('parent')) { - var parent_name = fullyQualifiedName(value.parent); - if (parent_name !== '') { - name = parent_name + '.' + name; - } + var parent_name = fullyQualifiedName(value.parent); + if (parent_name !== '') { + name = parent_name + '.' + name; } return name; }; diff --git a/src/node/src/credentials.js b/src/node/src/credentials.js index 009ff63306..ff10a22e6a 100644 --- a/src/node/src/credentials.js +++ b/src/node/src/credentials.js @@ -99,6 +99,9 @@ exports.createFromMetadataGenerator = function(metadata_generator) { if (error.hasOwnProperty('code')) { code = error.code; } + if (!metadata) { + metadata = new Metadata(); + } } callback(code, message, metadata._getCoreRepresentation()); }); @@ -153,7 +156,7 @@ exports.combineCallCredentials = function() { current = current.compose(arguments[i]); } return current; -} +}; /** * Create an insecure credentials object. This is used to create a channel that diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js index c316fe7f10..f1f86b35db 100644 --- a/src/node/test/call_test.js +++ b/src/node/test/call_test.js @@ -107,6 +107,23 @@ describe('call', function() { new grpc.Call(channel, 'method', 'now'); }, TypeError); }); + it('should succeed without the new keyword', function() { + assert.doesNotThrow(function() { + var call = grpc.Call(channel, 'method', new Date()); + assert(call instanceof grpc.Call); + }); + }); + }); + describe('deadline', function() { + it('should time out immediately with negative deadline', function(done) { + var call = new grpc.Call(channel, 'method', -Infinity); + var batch = {}; + batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(batch, function(err, response) { + assert.strictEqual(response.status.code, grpc.status.DEADLINE_EXCEEDED); + done(); + }); + }); }); describe('startBatch', function() { it('should fail without an object and a function', function() { @@ -192,6 +209,43 @@ describe('call', function() { }); }); }); + describe('cancelWithStatus', function() { + it('should reject anything other than an integer and a string', function() { + assert.doesNotThrow(function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + call.cancelWithStatus(1, 'details'); + }); + assert.throws(function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + call.cancelWithStatus(); + }); + assert.throws(function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + call.cancelWithStatus(''); + }); + assert.throws(function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + call.cancelWithStatus(5, {}); + }); + }); + it('should reject the OK status code', function() { + assert.throws(function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + call.cancelWithStatus(0, 'details'); + }); + }); + it('should result in the call ending with a status', function(done) { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + var batch = {}; + batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(batch, function(err, response) { + assert.strictEqual(response.status.code, 5); + assert.strictEqual(response.status.details, 'details'); + done(); + }); + call.cancelWithStatus(5, 'details'); + }); + }); describe('getPeer', function() { it('should return a string', function() { var call = new grpc.Call(channel, 'method', getDeadline(1)); diff --git a/src/node/test/channel_test.js b/src/node/test/channel_test.js index 05269f7b6e..7163a5fb5e 100644 --- a/src/node/test/channel_test.js +++ b/src/node/test/channel_test.js @@ -104,6 +104,12 @@ describe('channel', function() { new grpc.Channel('hostname', insecureCreds, {'key' : new Date()}); }); }); + it('should succeed without the new keyword', function() { + assert.doesNotThrow(function() { + var channel = grpc.Channel('hostname', insecureCreds); + assert(channel instanceof grpc.Channel); + }); + }); }); describe('close', function() { var channel; @@ -155,7 +161,6 @@ describe('channel', function() { deadline.setSeconds(deadline.getSeconds() + 1); channel.watchConnectivityState(old_state, deadline, function(err, value) { assert(err); - console.log('Callback from watchConnectivityState'); done(); }); }); diff --git a/src/node/test/credentials_test.js b/src/node/test/credentials_test.js index 7fc311a888..3d0b38fd52 100644 --- a/src/node/test/credentials_test.js +++ b/src/node/test/credentials_test.js @@ -60,6 +60,22 @@ function multiDone(done, count) { }; } +var fakeSuccessfulGoogleCredentials = { + getRequestMetadata: function(service_url, callback) { + setTimeout(function() { + callback(null, {Authorization: 'success'}); + }, 0); + } +}; + +var fakeFailingGoogleCredentials = { + getRequestMetadata: function(service_url, callback) { + setTimeout(function() { + callback(new Error("Authorization failure")); + }, 0); + } +}; + describe('client credentials', function() { var Client; var server; @@ -169,6 +185,52 @@ describe('client credentials', function() { done(); }); }); + it.skip('should propagate errors that the updater emits', function(done) { + var metadataUpdater = function(service_url, callback) { + var error = new Error('Authentication error'); + error.code = grpc.status.UNAUTHENTICATED; + callback(error); + }; + var creds = grpc.credentials.createFromMetadataGenerator(metadataUpdater); + var combined_creds = grpc.credentials.combineChannelCredentials( + client_ssl_creds, creds); + var client = new Client('localhost:' + port, combined_creds, + client_options); + client.unary({}, function(err, data) { + assert(err); + assert.strictEqual(err.message, 'Authentication error'); + assert.strictEqual(err.code, grpc.status.UNAUTHENTICATED); + done(); + }); + }); + it('should successfully wrap a Google credential', function(done) { + var creds = grpc.credentials.createFromGoogleCredential( + fakeSuccessfulGoogleCredentials); + var combined_creds = grpc.credentials.combineChannelCredentials( + client_ssl_creds, creds); + var client = new Client('localhost:' + port, combined_creds, + client_options); + var call = client.unary({}, function(err, data) { + assert.ifError(err); + }); + call.on('metadata', function(metadata) { + assert.deepEqual(metadata.get('authorization'), ['success']); + done(); + }); + }); + it.skip('should get an error from a Google credential', function(done) { + var creds = grpc.credentials.createFromGoogleCredential( + fakeFailingGoogleCredentials); + var combined_creds = grpc.credentials.combineChannelCredentials( + client_ssl_creds, creds); + var client = new Client('localhost:' + port, combined_creds, + client_options); + client.unary({}, function(err, data) { + assert(err); + assert.strictEqual(err.message, 'Authorization failure'); + done(); + }); + }); describe('Per-rpc creds', function() { var client; var updater_creds; diff --git a/src/node/test/health_test.js b/src/node/test/health_test.js index a4dc24cf46..c93b528d42 100644 --- a/src/node/test/health_test.js +++ b/src/node/test/health_test.js @@ -45,11 +45,13 @@ describe('Health Checking', function() { 'grpc.test.TestServiceNotServing': 'NOT_SERVING', 'grpc.test.TestServiceServing': 'SERVING' }; - var healthServer = new grpc.Server(); - healthServer.addProtoService(health.service, - new health.Implementation(statusMap)); + var healthServer; + var healthImpl; var healthClient; before(function() { + healthServer = new grpc.Server(); + healthImpl = new health.Implementation(statusMap); + healthServer.addProtoService(health.service, healthImpl); var port_num = healthServer.bind('0.0.0.0:0', grpc.ServerCredentials.createInsecure()); healthServer.start(); @@ -89,4 +91,16 @@ describe('Health Checking', function() { done(); }); }); + it('should get a different response if the status changes', function(done) { + healthClient.check({service: 'transient'}, function(err, response) { + assert(err); + assert.strictEqual(err.code, grpc.status.NOT_FOUND); + healthImpl.setStatus('transient', 'SERVING'); + healthClient.check({service: 'transient'}, function(err, response) { + assert.ifError(err); + assert.strictEqual(response.status, 'SERVING'); + done(); + }); + }); + }); }); diff --git a/src/node/test/interop_sanity_test.js b/src/node/test/interop_sanity_test.js index f8c0b14137..f008a87585 100644 --- a/src/node/test/interop_sanity_test.js +++ b/src/node/test/interop_sanity_test.js @@ -71,7 +71,7 @@ describe('Interop tests', function() { interop_client.runTest(port, name_override, 'server_streaming', true, true, done); }); - it.only('should pass ping_pong', function(done) { + it('should pass ping_pong', function(done) { interop_client.runTest(port, name_override, 'ping_pong', true, true, done); }); it('should pass empty_stream', function(done) { diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 395ea887ec..39673e4e05 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -92,6 +92,31 @@ describe('File loader', function() { }); }); }); +describe('surface Server', function() { + var server; + beforeEach(function() { + server = new grpc.Server(); + }); + afterEach(function() { + server.forceShutdown(); + }); + it('should error if started twice', function() { + server.start(); + assert.throws(function() { + server.start(); + }); + }); + it('should error if a port is bound after the server starts', function() { + server.start(); + assert.throws(function() { + server.bind('localhost:0', grpc.ServerCredentials.createInsecure()); + }); + }); + it('should successfully shutdown if tryShutdown is called', function(done) { + server.start(); + server.tryShutdown(done); + }); +}); describe('Server.prototype.addProtoService', function() { var server; var dummyImpls = { @@ -202,6 +227,16 @@ describe('waitForClientReady', function() { }); }); }); + it('should time out if the server does not exist', function(done) { + var bad_client = new Client('nonexistent_hostname', + grpc.credentials.createInsecure()); + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 1); + grpc.waitForClientReady(bad_client, deadline, function(error) { + assert(error); + done(); + }); + }); }); describe('Echo service', function() { var server; @@ -365,6 +400,123 @@ describe('Echo metadata', function() { done(); }); }); + it('properly handles duplicate values', function(done) { + var dup_metadata = metadata.clone(); + dup_metadata.add('key', 'value2'); + var call = client.unary({}, function(err, data) {assert.ifError(err); }, + dup_metadata); + call.on('metadata', function(resp_metadata) { + // Two arrays are equal iff their symmetric difference is empty + assert.deepEqual(_.xor(dup_metadata.get('key'), resp_metadata.get('key')), + []); + done(); + }); + }); +}); +describe('Client malformed response handling', function() { + var server; + var client; + var badArg = new Buffer([0xFF]); + before(function() { + var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); + var test_service = test_proto.lookup('TestService'); + var malformed_test_service = { + unary: { + path: '/TestService/Unary', + requestStream: false, + responseStream: false, + requestDeserialize: _.identity, + responseSerialize: _.identity + }, + clientStream: { + path: '/TestService/ClientStream', + requestStream: true, + responseStream: false, + requestDeserialize: _.identity, + responseSerialize: _.identity + }, + serverStream: { + path: '/TestService/ServerStream', + requestStream: false, + responseStream: true, + requestDeserialize: _.identity, + responseSerialize: _.identity + }, + bidiStream: { + path: '/TestService/BidiStream', + requestStream: true, + responseStream: true, + requestDeserialize: _.identity, + responseSerialize: _.identity + } + }; + server = new grpc.Server(); + server.addService(malformed_test_service, { + unary: function(call, cb) { + cb(null, badArg); + }, + clientStream: function(stream, cb) { + stream.on('data', function() {/* Ignore requests */}); + stream.on('end', function() { + cb(null, badArg); + }); + }, + serverStream: function(stream) { + stream.write(badArg); + stream.end(); + }, + bidiStream: function(stream) { + stream.on('data', function() { + // Ignore requests + stream.write(badArg); + }); + stream.on('end', function() { + stream.end(); + }); + } + }); + var port = server.bind('localhost:0', server_insecure_creds); + var Client = surface_client.makeProtobufClientConstructor(test_service); + client = new Client('localhost:' + port, grpc.credentials.createInsecure()); + server.start(); + }); + after(function() { + server.forceShutdown(); + }); + it('should get an INTERNAL status with a unary call', function(done) { + client.unary({}, function(err, data) { + assert(err); + assert.strictEqual(err.code, grpc.status.INTERNAL); + done(); + }); + }); + it('should get an INTERNAL status with a client stream call', function(done) { + var call = client.clientStream(function(err, data) { + assert(err); + assert.strictEqual(err.code, grpc.status.INTERNAL); + done(); + }); + call.write({}); + call.end(); + }); + it('should get an INTERNAL status with a server stream call', function(done) { + var call = client.serverStream({}); + call.on('data', function(){}); + call.on('error', function(err) { + assert.strictEqual(err.code, grpc.status.INTERNAL); + done(); + }); + }); + it('should get an INTERNAL status with a bidi stream call', function(done) { + var call = client.bidiStream(); + call.on('data', function(){}); + call.on('error', function(err) { + assert.strictEqual(err.code, grpc.status.INTERNAL); + done(); + }); + call.write({}); + call.end(); + }); }); describe('Other conditions', function() { var test_service; @@ -382,7 +534,8 @@ describe('Other conditions', function() { unary: function(call, cb) { var req = call.request; if (req.error) { - cb(new Error('Requested error'), null, trailer_metadata); + cb({code: grpc.status.UNKNOWN, + details: 'Requested error'}, null, trailer_metadata); } else { cb(null, {count: 1}, trailer_metadata); } @@ -407,7 +560,8 @@ describe('Other conditions', function() { serverStream: function(stream) { var req = stream.request; if (req.error) { - var err = new Error('Requested error'); + var err = {code: grpc.status.UNKNOWN, + details: 'Requested error'}; err.metadata = trailer_metadata; stream.emit('error', err); } else { @@ -447,6 +601,23 @@ describe('Other conditions', function() { assert.strictEqual(typeof grpc.getClientChannel(client).getTarget(), 'string'); }); + it('client should be able to pause and resume a stream', function(done) { + var call = client.bidiStream(); + call.on('data', function(data) { + assert(data.count < 3); + call.pause(); + setTimeout(function() { + call.resume(); + }, 10); + }); + call.on('end', function() { + done(); + }); + call.write({}); + call.write({}); + call.write({}); + call.end(); + }); describe('Server recieving bad input', function() { var misbehavingClient; var badArg = new Buffer([0xFF]); diff --git a/src/objective-c/.gitignore b/src/objective-c/.gitignore deleted file mode 100644 index 1511066800..0000000000 --- a/src/objective-c/.gitignore +++ /dev/null @@ -1,19 +0,0 @@ -# Xcode -# -build/ -*.pbxuser -!default.pbxuser -*.mode1v3 -!default.mode1v3 -*.mode2v3 -!default.mode2v3 -*.perspectivev3 -!default.perspectivev3 -xcuserdata -*.xccheckout -*.moved-aside -DerivedData -*.hmap -*.ipa -*.xcuserstate -*.DS_Store
\ No newline at end of file diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php index 381b114399..0a3e1f78bf 100755 --- a/src/php/lib/Grpc/BaseStub.php +++ b/src/php/lib/Grpc/BaseStub.php @@ -114,7 +114,7 @@ class BaseStub { return true; } if ($new_state == \Grpc\CHANNEL_FATAL_FAILURE) { - throw new Exception('Failed to connect to server'); + throw new \Exception('Failed to connect to server'); } return false; } @@ -153,6 +153,25 @@ class BaseStub { return array($metadata_copy, $timeout); } + /** + * validate and normalize the metadata array + * @param $metadata The metadata map + * @return $metadata Validated and key-normalized metadata map + * @throw InvalidArgumentException if key contains invalid characters + */ + private function _validate_and_normalize_metadata($metadata) { + $metadata_copy = array(); + foreach ($metadata as $key => $value) { + if (!preg_match('/^[A-Za-z\d_-]+$/', $key)) { + throw new \InvalidArgumentException( + 'Metadata keys must be nonempty strings containing only '. + 'alphanumeric characters, hyphens and underscores'); + } + $metadata_copy[strtolower($key)] = $value; + } + return $metadata_copy; + } + /* This class is intended to be subclassed by generated code, so all functions begin with "_" to avoid name collisions. */ @@ -178,6 +197,7 @@ class BaseStub { $actual_metadata, $jwt_aud_uri); } + $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata); $call->start($argument, $actual_metadata, $options); return $call; } @@ -204,6 +224,7 @@ class BaseStub { $actual_metadata, $jwt_aud_uri); } + $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata); $call->start($actual_metadata); return $call; } @@ -231,6 +252,7 @@ class BaseStub { $actual_metadata, $jwt_aud_uri); } + $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata); $call->start($argument, $actual_metadata, $options); return $call; } @@ -254,6 +276,7 @@ class BaseStub { $actual_metadata, $jwt_aud_uri); } + $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata); $call->start($actual_metadata); return $call; } diff --git a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php index 9cee188666..5cdba1e5a0 100644 --- a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php +++ b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php @@ -51,6 +51,14 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase { $this->assertTrue(is_string(self::$client->getTarget())); } + /** + * @expectedException InvalidArgumentException + */ + public function testInvalidMetadata() { + $div_arg = new math\DivArgs(); + $call = self::$client->Div($div_arg, array(' ' => 'abc123')); + } + public function testWriteFlags() { $div_arg = new math\DivArgs(); $div_arg->setDividend(7); diff --git a/src/php/tests/interop/interop_client.php b/src/php/tests/interop/interop_client.php index 0590264ef8..6670ef3ab9 100755 --- a/src/php/tests/interop/interop_client.php +++ b/src/php/tests/interop/interop_client.php @@ -36,6 +36,9 @@ require 'empty.php'; require 'message_set.php'; require 'messages.php'; require 'test.php'; +use Google\Auth\CredentialsLoader; +use Google\Auth\ApplicationDefaultCredentials; +use GuzzleHttp\ClientInterface; /** * Assertion function that always exits with an error code if the assertion is @@ -52,7 +55,6 @@ function hardAssert($value, $error_message) { /** * Run the empty_unary test. - * Passes when run against the Node server as of 2015-04-30 * @param $stub Stub object that has service methods */ function emptyUnary($stub) { @@ -63,7 +65,6 @@ function emptyUnary($stub) { /** * Run the large_unary test. - * Passes when run against the C++/Node server as of 2015-04-30 * @param $stub Stub object that has service methods */ function largeUnary($stub) { @@ -76,7 +77,8 @@ function largeUnary($stub) { * @param $fillUsername boolean whether to fill result with username * @param $fillOauthScope boolean whether to fill result with oauth scope */ -function performLargeUnary($stub, $fillUsername = false, $fillOauthScope = false) { +function performLargeUnary($stub, $fillUsername = false, $fillOauthScope = false, + $metadata = array()) { $request_len = 271828; $response_len = 314159; @@ -90,7 +92,7 @@ function performLargeUnary($stub, $fillUsername = false, $fillOauthScope = false $request->setFillUsername($fillUsername); $request->setFillOauthScope($fillOauthScope); - list($result, $status) = $stub->UnaryCall($request)->wait(); + list($result, $status) = $stub->UnaryCall($request, $metadata)->wait(); hardAssert($status->code === Grpc\STATUS_OK, 'Call did not complete successfully'); hardAssert($result !== null, 'Call returned a null response'); $payload = $result->getPayload(); @@ -105,7 +107,6 @@ function performLargeUnary($stub, $fillUsername = false, $fillOauthScope = false /** * Run the service account credentials auth test. - * Passes when run against the cloud server as of 2015-04-30 * @param $stub Stub object that has service methods * @param $args array command line args */ @@ -114,7 +115,7 @@ function serviceAccountCreds($stub, $args) { throw new Exception('Missing oauth scope'); } $jsonKey = json_decode( - file_get_contents(getenv(Google\Auth\CredentialsLoader::ENV_VAR)), + file_get_contents(getenv(CredentialsLoader::ENV_VAR)), true); $result = performLargeUnary($stub, $fillUsername=true, $fillOauthScope=true); hardAssert($result->getUsername() == $jsonKey['client_email'], @@ -143,13 +144,12 @@ function computeEngineCreds($stub, $args) { /** * Run the jwt token credentials auth test. - * Passes when run against the cloud server as of 2015-05-12 * @param $stub Stub object that has service methods * @param $args array command line args */ function jwtTokenCreds($stub, $args) { $jsonKey = json_decode( - file_get_contents(getenv(Google\Auth\CredentialsLoader::ENV_VAR)), + file_get_contents(getenv(CredentialsLoader::ENV_VAR)), true); $result = performLargeUnary($stub, $fillUsername=true, $fillOauthScope=true); hardAssert($result->getUsername() == $jsonKey['client_email'], @@ -157,8 +157,44 @@ function jwtTokenCreds($stub, $args) { } /** + * Run the oauth2_auth_token auth test. + * @param $stub Stub object that has service methods + * @param $args array command line args + */ +function oauth2AuthToken($stub, $args) { + $jsonKey = json_decode( + file_get_contents(getenv(CredentialsLoader::ENV_VAR)), + true); + $result = performLargeUnary($stub, $fillUsername=true, $fillOauthScope=true); + hardAssert($result->getUsername() == $jsonKey['client_email'], + 'invalid email returned'); +} + +/** + * Run the per_rpc_creds auth test. + * @param $stub Stub object that has service methods + * @param $args array command line args + */ +function perRpcCreds($stub, $args) { + $jsonKey = json_decode( + file_get_contents(getenv(CredentialsLoader::ENV_VAR)), + true); + $auth_credentials = ApplicationDefaultCredentials::getCredentials( + $args['oauth_scope'] + ); + $token = $auth_credentials->fetchAuthToken(); + $metadata = array(CredentialsLoader::AUTH_METADATA_KEY => + array(sprintf("%s %s", + $token['token_type'], + $token['access_token']))); + $result = performLargeUnary($stub, $fillUsername=true, $fillOauthScope=true, + $metadata); + hardAssert($result->getUsername() == $jsonKey['client_email'], + 'invalid email returned'); +} + +/** * Run the client_streaming test. - * Passes when run against the Node server as of 2015-04-30 * @param $stub Stub object that has service methods */ function clientStreaming($stub) { @@ -185,7 +221,6 @@ function clientStreaming($stub) { /** * Run the server_streaming test. - * Passes when run against the Node server as of 2015-04-30 * @param $stub Stub object that has service methods. */ function serverStreaming($stub) { @@ -216,7 +251,6 @@ function serverStreaming($stub) { /** * Run the ping_pong test. - * Passes when run against the Node server as of 2015-04-30 * @param $stub Stub object that has service methods. */ function pingPong($stub) { @@ -252,24 +286,18 @@ function pingPong($stub) { /** * Run the empty_stream test. - * Passes when run against the Node server as of 2015-10-09 * @param $stub Stub object that has service methods. */ function emptyStream($stub) { - // for the current PHP implementation, $call->read() will wait - // forever for a server response if the server is not sending any. - // so this test is imeplemented as a timeout to indicate the absence - // of receiving any response from the server - $call = $stub->FullDuplexCall(array('timeout' => 100000)); + $call = $stub->FullDuplexCall(); $call->writesDone(); hardAssert($call->read() === null, 'Server returned too many responses'); hardAssert($call->getStatus()->code === Grpc\STATUS_OK, - 'Call did not complete successfully'); + 'Call did not complete successfully'); } /** * Run the cancel_after_begin test. - * Passes when run against the Node server as of 2015-08-28 * @param $stub Stub object that has service methods. */ function cancelAfterBegin($stub) { @@ -282,7 +310,6 @@ function cancelAfterBegin($stub) { /** * Run the cancel_after_first_response test. - * Passes when run against the Node server as of 2015-04-30 * @param $stub Stub object that has service methods. */ function cancelAfterFirstResponse($stub) { @@ -323,12 +350,17 @@ function timeoutOnSleepingServer($stub) { } $args = getopt('', array('server_host:', 'server_port:', 'test_case:', + 'use_tls::', 'use_test_ca::', 'server_host_override:', 'oauth_scope:', 'default_service_account:')); -if (!array_key_exists('server_host', $args) || - !array_key_exists('server_port', $args) || - !array_key_exists('test_case', $args)) { - throw new Exception('Missing argument'); +if (!array_key_exists('server_host', $args)) { + throw new Exception('Missing argument: --server_host is required'); +} +if (!array_key_exists('server_port', $args)) { + throw new Exception('Missing argument: --server_port is required'); +} +if (!array_key_exists('test_case', $args)) { + throw new Exception('Missing argument: --test_case is required'); } if ($args['server_port'] == 443) { @@ -337,41 +369,76 @@ if ($args['server_port'] == 443) { $server_address = $args['server_host'] . ':' . $args['server_port']; } -if (!array_key_exists('server_host_override', $args)) { - $args['server_host_override'] = 'foo.test.google.fr'; +$test_case = $args['test_case']; + +$host_override = 'foo.test.google.fr'; +if (array_key_exists('server_host_override', $args)) { + $host_override = $args['server_host_override']; +} + +$use_tls = false; +if (array_key_exists('use_tls', $args) && + $args['use_tls'] != 'false') { + $use_tls = true; } -$ssl_cert_file = getenv('SSL_CERT_FILE'); -if (!$ssl_cert_file) { - $ssl_cert_file = dirname(__FILE__) . '/../data/ca.pem'; +$use_test_ca = false; +if (array_key_exists('use_test_ca', $args) && + $args['use_test_ca'] != 'false') { + $use_test_ca = true; } -$credentials = Grpc\Credentials::createSsl(file_get_contents($ssl_cert_file)); +$opts = []; -$opts = [ - 'grpc.ssl_target_name_override' => $args['server_host_override'], - 'credentials' => $credentials, - ]; +if ($use_tls) { + if ($use_test_ca) { + $ssl_cert_file = dirname(__FILE__) . '/../data/ca.pem'; + } else { + $ssl_cert_file = getenv('SSL_CERT_FILE'); + } + $ssl_credentials = Grpc\Credentials::createSsl( + file_get_contents($ssl_cert_file)); + $opts['credentials'] = $ssl_credentials; + $opts['grpc.ssl_target_name_override'] = $host_override; +} -if (in_array($args['test_case'], array( - 'service_account_creds', - 'compute_engine_creds', - 'jwt_token_creds'))) { - if ($args['test_case'] == 'jwt_token_creds') { - $auth = Google\Auth\ApplicationDefaultCredentials::getCredentials(); +if (in_array($test_case, array('service_account_creds', + 'compute_engine_creds', 'jwt_token_creds'))) { + if ($test_case == 'jwt_token_creds') { + $auth_credentials = ApplicationDefaultCredentials::getCredentials(); } else { - $auth = Google\Auth\ApplicationDefaultCredentials::getCredentials( - $args['oauth_scope']); + $auth_credentials = ApplicationDefaultCredentials::getCredentials( + $args['oauth_scope'] + ); } - $opts['update_metadata'] = $auth->getUpdateMetadataFunc(); + $opts['update_metadata'] = $auth_credentials->getUpdateMetadataFunc(); +} + +if ($test_case == 'oauth2_auth_token') { + $auth_credentials = ApplicationDefaultCredentials::getCredentials( + $args['oauth_scope'] + ); + $token = $auth_credentials->fetchAuthToken(); + $update_metadata = + function($metadata, + $authUri = null, + ClientInterface $client = null) use ($token) { + $metadata_copy = $metadata; + $metadata_copy[CredentialsLoader::AUTH_METADATA_KEY] = + array(sprintf("%s %s", + $token['token_type'], + $token['access_token'])); + return $metadata_copy; + }; + $opts['update_metadata'] = $update_metadata; } $stub = new grpc\testing\TestServiceClient($server_address, $opts); echo "Connecting to $server_address\n"; -echo "Running test case $args[test_case]\n"; +echo "Running test case $test_case\n"; -switch ($args['test_case']) { +switch ($test_case) { case 'empty_unary': emptyUnary($stub); break; @@ -408,7 +475,13 @@ switch ($args['test_case']) { case 'jwt_token_creds': jwtTokenCreds($stub, $args); break; + case 'oauth2_auth_token': + oauth2AuthToken($stub, $args); + break; + case 'per_rpc_creds': + perRpcCreds($stub, $args); + break; default: - echo "Unsupported test case $args[test_case]\n"; + echo "Unsupported test case $test_case\n"; exit(1); } diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx index 4349786b3a..ed037b660a 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx +++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx @@ -29,6 +29,7 @@ cimport cpython +from grpc._cython._cygrpc cimport grpc from grpc._cython._cygrpc cimport records @@ -49,7 +50,7 @@ cdef class Call: cpython.Py_INCREF(operation_tag) return grpc.grpc_call_start_batch( self.c_call, cy_operations.c_ops, cy_operations.c_nops, - <cpython.PyObject *>operation_tag) + <cpython.PyObject *>operation_tag, NULL) def cancel(self, grpc.grpc_status_code error_code=grpc.GRPC_STATUS__DO_NOT_USE, @@ -67,9 +68,10 @@ cdef class Call: raise TypeError("expected details to be str or bytes") if error_code != grpc.GRPC_STATUS__DO_NOT_USE: self.references.append(details) - return grpc.grpc_call_cancel_with_status(self.c_call, error_code, details) + return grpc.grpc_call_cancel_with_status(self.c_call, error_code, details, + NULL) else: - return grpc.grpc_call_cancel(self.c_call) + return grpc.grpc_call_cancel(self.c_call, NULL) def __dealloc__(self): if self.c_call != NULL: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx index b20313818d..b52ddb6bcd 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx @@ -30,6 +30,7 @@ from grpc._cython._cygrpc cimport call from grpc._cython._cygrpc cimport completion_queue from grpc._cython._cygrpc cimport credentials +from grpc._cython._cygrpc cimport grpc from grpc._cython._cygrpc cimport records @@ -49,15 +50,17 @@ cdef class Channel: else: raise TypeError("expected target to be str or bytes") if client_credentials is None: - self.c_channel = grpc.grpc_channel_create(target, c_arguments) + self.c_channel = grpc.grpc_insecure_channel_create(target, c_arguments, + NULL) else: self.c_channel = grpc.grpc_secure_channel_create( - client_credentials.c_credentials, target, c_arguments) + client_credentials.c_credentials, target, c_arguments, NULL) self.references.append(client_credentials) self.references.append(target) self.references.append(arguments) - def create_call(self, completion_queue.CompletionQueue queue not None, + def create_call(self, call.Call parent, int flags, + completion_queue.CompletionQueue queue not None, method, host, records.Timespec deadline not None): if queue.is_shutting_down: raise ValueError("queue must not be shutting down or shutdown") @@ -75,8 +78,13 @@ cdef class Channel: raise TypeError("expected host to be str or bytes") cdef call.Call operation_call = call.Call() operation_call.references = [self, method, host, queue] + cdef grpc.grpc_call *parent_call = NULL + if parent is not None: + parent_call = parent.c_call operation_call.c_call = grpc.grpc_channel_create_call( - self.c_channel, queue.c_completion_queue, method, host, deadline.c_time) + self.c_channel, parent_call, flags, + queue.c_completion_queue, method, host, deadline.c_time, + NULL) return operation_call def __dealloc__(self): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx index 886d85360a..a7a265eab7 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx @@ -30,6 +30,7 @@ cimport cpython from grpc._cython._cygrpc cimport call +from grpc._cython._cygrpc cimport grpc from grpc._cython._cygrpc cimport records import threading @@ -39,7 +40,7 @@ import time cdef class CompletionQueue: def __cinit__(self): - self.c_completion_queue = grpc.grpc_completion_queue_create() + self.c_completion_queue = grpc.grpc_completion_queue_create(NULL) self.is_shutting_down = False self.is_shutdown = False self.poll_condition = threading.Condition() @@ -48,7 +49,8 @@ cdef class CompletionQueue: def poll(self, records.Timespec deadline=None): # We name this 'poll' to avoid problems with CPython's expectations for # 'special' methods (like next and __next__). - cdef grpc.gpr_timespec c_deadline = grpc.gpr_inf_future + cdef grpc.gpr_timespec c_deadline = grpc.gpr_inf_future( + grpc.GPR_CLOCK_REALTIME) cdef records.OperationTag tag = None cdef object user_tag = None cdef call.Call operation_call = None @@ -66,7 +68,7 @@ cdef class CompletionQueue: self.is_polling = True with nogil: event = grpc.grpc_completion_queue_next( - self.c_completion_queue, c_deadline) + self.c_completion_queue, c_deadline, NULL) with self.poll_condition: self.is_polling = False self.poll_condition.notify() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx index dc40a7a611..608207f0a2 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx @@ -27,6 +27,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from grpc._cython._cygrpc cimport grpc from grpc._cython._cygrpc cimport records @@ -81,13 +82,11 @@ def client_credentials_ssl(pem_root_certificates, credentials.references.append(pem_root_certificates) if ssl_pem_key_cert_pair is not None: credentials.c_credentials = grpc.grpc_ssl_credentials_create( - c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair - ) + c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL) credentials.references.append(ssl_pem_key_cert_pair) else: credentials.c_credentials = grpc.grpc_ssl_credentials_create( - c_pem_root_certificates, NULL - ) + c_pem_root_certificates, NULL, NULL) def client_credentials_composite_credentials( ClientCredentials credentials_1 not None, @@ -96,18 +95,20 @@ def client_credentials_composite_credentials( raise ValueError("passed credentials must both be valid") cdef ClientCredentials credentials = ClientCredentials() credentials.c_credentials = grpc.grpc_composite_credentials_create( - credentials_1.c_credentials, credentials_2.c_credentials) + credentials_1.c_credentials, credentials_2.c_credentials, NULL) credentials.references.append(credentials_1) credentials.references.append(credentials_2) return credentials -def client_credentials_compute_engine(): +def client_credentials_google_compute_engine(): cdef ClientCredentials credentials = ClientCredentials() - credentials.c_credentials = grpc.grpc_compute_engine_credentials_create() + credentials.c_credentials = ( + grpc.grpc_google_compute_engine_credentials_create(NULL)) return credentials #TODO rename to something like client_credentials_service_account_jwt_access. -def client_credentials_jwt(json_key, records.Timespec token_lifetime not None): +def client_credentials_service_account_jwt_access( + json_key, records.Timespec token_lifetime not None): if isinstance(json_key, bytes): pass elif isinstance(json_key, basestring): @@ -115,12 +116,13 @@ def client_credentials_jwt(json_key, records.Timespec token_lifetime not None): else: raise TypeError("expected json_key to be str or bytes") cdef ClientCredentials credentials = ClientCredentials() - credentials.c_credentials = grpc.grpc_service_account_jwt_access_credentials_create( - json_key, token_lifetime.c_time) + credentials.c_credentials = ( + grpc.grpc_service_account_jwt_access_credentials_create( + json_key, token_lifetime.c_time, NULL)) credentials.references.append(json_key) return credentials -def client_credentials_refresh_token(json_refresh_token): +def client_credentials_google_refresh_token(json_refresh_token): if isinstance(json_refresh_token, bytes): pass elif isinstance(json_refresh_token, basestring): @@ -128,12 +130,12 @@ def client_credentials_refresh_token(json_refresh_token): else: raise TypeError("expected json_refresh_token to be str or bytes") cdef ClientCredentials credentials = ClientCredentials() - credentials.c_credentials = grpc.grpc_refresh_token_credentials_create( - json_refresh_token) + credentials.c_credentials = grpc.grpc_google_refresh_token_credentials_create( + json_refresh_token, NULL) credentials.references.append(json_refresh_token) return credentials -def client_credentials_iam(authorization_token, authority_selector): +def client_credentials_google_iam(authorization_token, authority_selector): if isinstance(authorization_token, bytes): pass elif isinstance(authorization_token, basestring): @@ -147,13 +149,14 @@ def client_credentials_iam(authorization_token, authority_selector): else: raise TypeError("expected authority_selector to be str or bytes") cdef ClientCredentials credentials = ClientCredentials() - credentials.c_credentials = grpc.grpc_iam_credentials_create( - authorization_token, authority_selector) + credentials.c_credentials = grpc.grpc_google_iam_credentials_create( + authorization_token, authority_selector, NULL) credentials.references.append(authorization_token) credentials.references.append(authority_selector) return credentials -def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs): +def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs, + bint force_client_auth): if pem_root_certs is None: pass elif isinstance(pem_root_certs, bytes): @@ -181,7 +184,6 @@ def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs): (<records.SslPemKeyCertPair>pem_key_cert_pairs[i]).c_pair) credentials.c_credentials = grpc.grpc_ssl_server_credentials_create( pem_root_certs, credentials.c_ssl_pem_key_cert_pairs, - credentials.c_ssl_pem_key_cert_pairs_count - ) + credentials.c_ssl_pem_key_cert_pairs_count, force_client_auth, NULL) return credentials diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxd b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxd index 8b46972490..62d40e7a58 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxd +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxd @@ -64,15 +64,25 @@ cdef extern from "grpc/support/port_platform.h": cdef extern from "grpc/support/time.h": + ctypedef enum gpr_clock_type: + GPR_CLOCK_MONOTONIC + GPR_CLOCK_REALTIME + GPR_CLOCK_PRECISE + GPR_TIMESPAN + ctypedef struct gpr_timespec: libc.time.time_t seconds "tv_sec" int nanoseconds "tv_nsec" + gpr_clock_type clock_type + + gpr_timespec gpr_time_0(gpr_clock_type type) + gpr_timespec gpr_inf_future(gpr_clock_type type) + gpr_timespec gpr_inf_past(gpr_clock_type type) - cdef gpr_timespec gpr_time_0 - cdef gpr_timespec gpr_inf_future - cdef gpr_timespec gpr_inf_past + gpr_timespec gpr_now(gpr_clock_type clock) - gpr_timespec gpr_now() + gpr_timespec gpr_convert_clock_type(gpr_timespec t, + gpr_clock_type target_clock) cdef extern from "grpc/status.h": @@ -255,38 +265,44 @@ cdef extern from "grpc/grpc.h": void grpc_init() void grpc_shutdown() - grpc_completion_queue *grpc_completion_queue_create() + grpc_completion_queue *grpc_completion_queue_create(void *reserved) grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, - gpr_timespec deadline) nogil + gpr_timespec deadline, + void *reserved) nogil void grpc_completion_queue_shutdown(grpc_completion_queue *cq) void grpc_completion_queue_destroy(grpc_completion_queue *cq) grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, - size_t nops, void *tag) - grpc_call_error grpc_call_cancel(grpc_call *call) + size_t nops, void *tag, void *reserved) + grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) grpc_call_error grpc_call_cancel_with_status(grpc_call *call, grpc_status_code status, - const char *description) + const char *description, + void *reserved) void grpc_call_destroy(grpc_call *call) - grpc_channel *grpc_channel_create(const char *target, - const grpc_channel_args *args) + grpc_channel *grpc_insecure_channel_create(const char *target, + const grpc_channel_args *args, + void *reserved) grpc_call *grpc_channel_create_call(grpc_channel *channel, + grpc_call *parent_call, + gpr_uint32 propagation_mask, grpc_completion_queue *completion_queue, const char *method, const char *host, - gpr_timespec deadline) + gpr_timespec deadline, void *reserved) void grpc_channel_destroy(grpc_channel *channel) - grpc_server *grpc_server_create(const grpc_channel_args *args) + grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) grpc_call_error grpc_server_request_call( grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag_new) void grpc_server_register_completion_queue(grpc_server *server, - grpc_completion_queue *cq) - int grpc_server_add_http2_port(grpc_server *server, const char *addr) + grpc_completion_queue *cq, + void *reserved) + int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) void grpc_server_start(grpc_server *server) void grpc_server_shutdown_and_notify( grpc_server *server, grpc_completion_queue *cq, void *tag) @@ -306,22 +322,27 @@ cdef extern from "grpc/grpc_security.h": grpc_credentials *grpc_google_default_credentials_create() grpc_credentials *grpc_ssl_credentials_create( - const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pair) + const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pair, + void *reserved) grpc_credentials *grpc_composite_credentials_create(grpc_credentials *creds1, - grpc_credentials *creds2) - grpc_credentials *grpc_compute_engine_credentials_create() - grpc_credentials *grpc_service_account_jwt_access_credentials_create(const char *json_key, - gpr_timespec token_lifetime) - grpc_credentials *grpc_refresh_token_credentials_create( - const char *json_refresh_token) - grpc_credentials *grpc_iam_credentials_create(const char *authorization_token, - const char *authority_selector) + grpc_credentials *creds2, + void *reserved) + grpc_credentials *grpc_google_compute_engine_credentials_create( + void *reserved) + grpc_credentials *grpc_service_account_jwt_access_credentials_create( + const char *json_key, + gpr_timespec token_lifetime, void *reserved) + grpc_credentials *grpc_google_refresh_token_credentials_create( + const char *json_refresh_token, void *reserved) + grpc_credentials *grpc_google_iam_credentials_create( + const char *authorization_token, const char *authority_selector, + void *reserved) void grpc_credentials_release(grpc_credentials *creds) grpc_channel *grpc_secure_channel_create( grpc_credentials *creds, const char *target, - const grpc_channel_args *args) + const grpc_channel_args *args, void *reserved) ctypedef struct grpc_server_credentials: # We don't care about the internals (and in fact don't know them) @@ -330,7 +351,7 @@ cdef extern from "grpc/grpc_security.h": grpc_server_credentials *grpc_ssl_server_credentials_create( const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, - size_t num_key_cert_pairs) + size_t num_key_cert_pairs, int force_client_auth, void *reserved) void grpc_server_credentials_release(grpc_server_credentials *creds) int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx index 4814769fd2..8edee09c2d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx @@ -87,28 +87,38 @@ cdef class Timespec: def __cinit__(self, time): if time is None: - self.c_time = grpc.gpr_now() + self.c_time = grpc.gpr_now(grpc.GPR_CLOCK_REALTIME) elif isinstance(time, float): if time == float("+inf"): - self.c_time = grpc.gpr_inf_future + self.c_time = grpc.gpr_inf_future(grpc.GPR_CLOCK_REALTIME) elif time == float("-inf"): - self.c_time = grpc.gpr_inf_past + self.c_time = grpc.gpr_inf_past(grpc.GPR_CLOCK_REALTIME) else: self.c_time.seconds = time self.c_time.nanoseconds = (time - float(self.c_time.seconds)) * 1e9 + self.c_time.clock_type = grpc.GPR_CLOCK_REALTIME else: raise TypeError("expected time to be float") @property def seconds(self): - return self.c_time.seconds + # TODO(atash) ensure that everywhere a Timespec is created that it's + # converted to GPR_CLOCK_REALTIME then and not every time someone wants to + # read values off in Python. + cdef grpc.gpr_timespec real_time = ( + grpc.gpr_convert_clock_type(self.c_time, grpc.GPR_CLOCK_REALTIME)) + return real_time.seconds @property def nanoseconds(self): - return self.c_time.nanoseconds + cdef grpc.gpr_timespec real_time = ( + grpc.gpr_convert_clock_type(self.c_time, grpc.GPR_CLOCK_REALTIME)) + return real_time.nanoseconds def __float__(self): - return <double>self.c_time.seconds + <double>self.c_time.nanoseconds / 1e9 + cdef grpc.gpr_timespec real_time = ( + grpc.gpr_convert_clock_type(self.c_time, grpc.GPR_CLOCK_REALTIME)) + return <double>real_time.seconds + <double>real_time.nanoseconds / 1e9 infinite_future = Timespec(float("+inf")) infinite_past = Timespec(float("-inf")) @@ -339,13 +349,16 @@ cdef class _MetadataIterator: self.i = 0 self.metadata = metadata + def __iter__(self): + return self + def __next__(self): if self.i < len(self.metadata): result = self.metadata[self.i] self.i = self.i + 1 return result else: - raise StopIteration() + raise StopIteration cdef class Metadata: @@ -536,13 +549,16 @@ cdef class _OperationsIterator: self.i = 0 self.operations = operations + def __iter__(self): + return self + def __next__(self): if self.i < len(self.operations): result = self.operations[self.i] self.i = self.i + 1 return result else: - raise StopIteration() + raise StopIteration cdef class Operations: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx index dcf9d38337..6d20d2910c 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx @@ -32,6 +32,7 @@ cimport cpython from grpc._cython._cygrpc cimport call from grpc._cython._cygrpc cimport completion_queue from grpc._cython._cygrpc cimport credentials +from grpc._cython._cygrpc cimport grpc from grpc._cython._cygrpc cimport records import time @@ -46,7 +47,7 @@ cdef class Server: if arguments is not None: c_arguments = &arguments.c_args self.references.append(arguments) - self.c_server = grpc.grpc_server_create(c_arguments) + self.c_server = grpc.grpc_server_create(c_arguments, NULL) self.is_started = False self.is_shutting_down = False self.is_shutdown = False @@ -78,7 +79,7 @@ cdef class Server: if self.is_started: raise ValueError("cannot register completion queues after start") grpc.grpc_server_register_completion_queue( - self.c_server, queue.c_completion_queue) + self.c_server, queue.c_completion_queue, NULL) self.registered_completion_queues.append(queue) def start(self): @@ -103,7 +104,7 @@ cdef class Server: return grpc.grpc_server_add_secure_http2_port( self.c_server, address, server_credentials.c_credentials) else: - return grpc.grpc_server_add_http2_port(self.c_server, address) + return grpc.grpc_server_add_insecure_http2_port(self.c_server, address) def shutdown(self, completion_queue.CompletionQueue queue not None, tag): cdef records.OperationTag operation_tag diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index f4d9661580..1ef2997db5 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -78,11 +78,13 @@ client_credentials_google_default = ( client_credentials_ssl = credentials.client_credentials_ssl client_credentials_composite_credentials = ( credentials.client_credentials_composite_credentials) -client_credentials_compute_engine = ( - credentials.client_credentials_compute_engine) -client_credentials_jwt = credentials.client_credentials_jwt -client_credentials_refresh_token = credentials.client_credentials_refresh_token -client_credentials_iam = credentials.client_credentials_iam +client_credentials_google_compute_engine = ( + credentials.client_credentials_google_compute_engine) +client_credentials_jwt_access = ( + credentials.client_credentials_service_account_jwt_access) +client_credentials_refresh_token = ( + credentials.client_credentials_google_refresh_token) +client_credentials_google_iam = credentials.client_credentials_google_iam server_credentials_ssl = credentials.server_credentials_ssl CompletionQueue = completion_queue.CompletionQueue diff --git a/src/python/grpcio/requirements.txt b/src/python/grpcio/requirements.txt index 77356e0a74..ee8568120b 100644 --- a/src/python/grpcio/requirements.txt +++ b/src/python/grpcio/requirements.txt @@ -1,2 +1,3 @@ enum34>=1.0.4 futures>=2.2.0 +cython>=0.23 diff --git a/src/python/grpcio/setup.py b/src/python/grpcio/setup.py index 8b87c09d5c..97fa4fe6b3 100644 --- a/src/python/grpcio/setup.py +++ b/src/python/grpcio/setup.py @@ -34,6 +34,7 @@ import os.path import sys from distutils import core as _core +from distutils import extension as _extension import setuptools # Ensure we're in the proper directory whether or not we're being used by pip. @@ -59,6 +60,18 @@ _C_EXTENSION_SOURCES = ( 'grpc/_adapter/_c/types/server.c', ) +_CYTHON_EXTENSION_PACKAGE_NAMES = () + +_CYTHON_EXTENSION_MODULE_NAMES = ( + 'grpc._cython.cygrpc', + 'grpc._cython._cygrpc.call', + 'grpc._cython._cygrpc.channel', + 'grpc._cython._cygrpc.completion_queue', + 'grpc._cython._cygrpc.credentials', + 'grpc._cython._cygrpc.records', + 'grpc._cython._cygrpc.server', +) + _EXTENSION_INCLUDE_DIRECTORIES = ( '.', ) @@ -78,9 +91,30 @@ _C_EXTENSION_MODULE = _core.Extension( ) _EXTENSION_MODULES = [_C_EXTENSION_MODULE] -_PACKAGES = ( - setuptools.find_packages('.', exclude=['*._cython', '*._cython.*']) -) + +def cython_extensions(package_names, module_names, include_dirs, libraries, + build_with_cython=False): + file_extension = 'pyx' if build_with_cython else 'c' + module_files = [name.replace('.', '/') + '.' + file_extension + for name in module_names] + extensions = [ + _extension.Extension( + name=module_name, sources=[module_file], + include_dirs=include_dirs, libraries=libraries + ) for (module_name, module_file) in zip(module_names, module_files) + ] + if build_with_cython: + import Cython.Build + return Cython.Build.cythonize(extensions) + else: + return extensions + +_CYTHON_EXTENSION_MODULES = cython_extensions( + list(_CYTHON_EXTENSION_PACKAGE_NAMES), list(_CYTHON_EXTENSION_MODULE_NAMES), + list(_EXTENSION_INCLUDE_DIRECTORIES), list(_EXTENSION_LIBRARIES), + bool(_BUILD_WITH_CYTHON)) + +_PACKAGES = setuptools.find_packages('.') _PACKAGE_DIRECTORIES = { '': '.', @@ -104,7 +138,7 @@ _COMMAND_CLASS = { setuptools.setup( name='grpcio', version='0.11.0b1', - ext_modules=_EXTENSION_MODULES, + ext_modules=_EXTENSION_MODULES + _CYTHON_EXTENSION_MODULES, packages=list(_PACKAGES), package_dir=_PACKAGE_DIRECTORIES, install_requires=_INSTALL_REQUIRES, diff --git a/src/python/grpcio_test/.gitignore b/src/python/grpcio_test/.gitignore index e3540baa7c..4bb4d42dfe 100644 --- a/src/python/grpcio_test/.gitignore +++ b/src/python/grpcio_test/.gitignore @@ -7,5 +7,5 @@ dist/ *.eggs/ .coverage .coverage.* -.cache +.cache/ nosetests.xml diff --git a/src/python/grpcio_test/grpc_interop/client.py b/src/python/grpcio_test/grpc_interop/client.py index 36afe6c096..01928886b4 100644 --- a/src/python/grpcio_test/grpc_interop/client.py +++ b/src/python/grpcio_test/grpc_interop/client.py @@ -49,11 +49,11 @@ def _args(): parser.add_argument( '--test_case', help='the test case to execute', type=str) parser.add_argument( - '--use_tls', help='require a secure connection', dest='use_tls', - action='store_true') + '--use_tls', help='require a secure connection', default=False, + type=resources.parse_bool) parser.add_argument( '--use_test_ca', help='replace platform root CAs with ca.pem', - action='store_true') + default=False, type=resources.parse_bool) parser.add_argument( '--server_host_override', help='the server host to which to claim to connect', type=str) diff --git a/src/python/grpcio_test/grpc_interop/methods.py b/src/python/grpcio_test/grpc_interop/methods.py index 52b800af7a..8ab5bac302 100644 --- a/src/python/grpcio_test/grpc_interop/methods.py +++ b/src/python/grpcio_test/grpc_interop/methods.py @@ -338,6 +338,17 @@ def _timeout_on_sleeping_server(stub): raise ValueError('expected call to exceed deadline') +def _empty_stream(stub): + with stub, _Pipe() as pipe: + response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) + pipe.close() + try: + next(response_iterator) + raise ValueError('expected exactly 0 responses') + except StopIteration: + pass + + def _compute_engine_creds(stub, args): response = _large_unary_common_behavior(stub, True, True) if args.default_service_account != response.username: @@ -368,6 +379,7 @@ class TestCase(enum.Enum): PING_PONG = 'ping_pong' CANCEL_AFTER_BEGIN = 'cancel_after_begin' CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response' + EMPTY_STREAM = 'empty_stream' COMPUTE_ENGINE_CREDS = 'compute_engine_creds' OAUTH2_AUTH_TOKEN = 'oauth2_auth_token' TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server' @@ -389,6 +401,8 @@ class TestCase(enum.Enum): _cancel_after_first_response(stub) elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER: _timeout_on_sleeping_server(stub) + elif self is TestCase.EMPTY_STREAM: + _empty_stream(stub) elif self is TestCase.COMPUTE_ENGINE_CREDS: _compute_engine_creds(stub, args) elif self is TestCase.OAUTH2_AUTH_TOKEN: diff --git a/src/python/grpcio_test/grpc_interop/resources.py b/src/python/grpcio_test/grpc_interop/resources.py index 2c3045313d..1122499418 100644 --- a/src/python/grpcio_test/grpc_interop/resources.py +++ b/src/python/grpcio_test/grpc_interop/resources.py @@ -29,6 +29,7 @@ """Constants and functions for data used in interoperability testing.""" +import argparse import os import pkg_resources @@ -54,3 +55,11 @@ def private_key(): def certificate_chain(): return pkg_resources.resource_string( __name__, _CERTIFICATE_CHAIN_RESOURCE_PATH) + + +def parse_bool(value): + if value == 'true': + return True + if value == 'false': + return False + raise argparse.ArgumentTypeError('Only true/false allowed') diff --git a/src/python/grpcio_test/grpc_interop/server.py b/src/python/grpcio_test/grpc_interop/server.py index 60f630a6be..d4c1b4dbf6 100644 --- a/src/python/grpcio_test/grpc_interop/server.py +++ b/src/python/grpcio_test/grpc_interop/server.py @@ -46,8 +46,8 @@ def serve(): parser.add_argument( '--port', help='the port on which to serve', type=int) parser.add_argument( - '--use_tls', help='require a secure connection', dest='use_tls', - action='store_true') + '--use_tls', help='require a secure connection', + default=False, type=resources.parse_bool) args = parser.parse_args() if args.use_tls: diff --git a/src/python/grpcio_test/grpc_test/_cython/cygrpc_test.py b/src/python/grpcio_test/grpc_test/_cython/cygrpc_test.py index 637506b42e..1307a30ca0 100644 --- a/src/python/grpcio_test/grpc_test/_cython/cygrpc_test.py +++ b/src/python/grpcio_test/grpc_test/_cython/cygrpc_test.py @@ -32,6 +32,7 @@ import unittest from grpc._cython import cygrpc from grpc_test._cython import test_utilities +from grpc_test import test_common class TypeSmokeTest(unittest.TestCase): @@ -139,7 +140,7 @@ class InsecureServerInsecureClient(unittest.TestCase): CLIENT_METADATA_BIN_VALUE = b'\0'*1000 SERVER_INITIAL_METADATA_KEY = b'init_me_me_me' SERVER_INITIAL_METADATA_VALUE = b'whodawha?' - SERVER_TRAILING_METADATA_KEY = b'California_is_in_a_drought' + SERVER_TRAILING_METADATA_KEY = b'california_is_in_a_drought' SERVER_TRAILING_METADATA_VALUE = b'zomg it is' SERVER_STATUS_CODE = cygrpc.StatusCode.ok SERVER_STATUS_DETAILS = b'our work is never over' @@ -158,8 +159,8 @@ class InsecureServerInsecureClient(unittest.TestCase): self.assertEqual(cygrpc.CallError.ok, request_call_result) client_call_tag = object() - client_call = self.client_channel.create_call(self.client_completion_queue, - METHOD, HOST, cygrpc_deadline) + client_call = self.client_channel.create_call( + None, 0, self.client_completion_queue, METHOD, HOST, cygrpc_deadline) client_initial_metadata = cygrpc.Metadata([ cygrpc.Metadatum(CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE), @@ -182,8 +183,9 @@ class InsecureServerInsecureClient(unittest.TestCase): self.assertIsInstance(request_event.operation_call, cygrpc.Call) self.assertIs(server_request_tag, request_event.tag) self.assertEqual(0, len(request_event.batch_operations)) - self.assertEqual(dict(client_initial_metadata), - dict(request_event.request_metadata)) + self.assertTrue( + test_common.metadata_transmitted(client_initial_metadata, + request_event.request_metadata)) self.assertEqual(METHOD, request_event.request_call_details.method) self.assertEqual(HOST, request_event.request_call_details.host) self.assertLess( @@ -218,13 +220,15 @@ class InsecureServerInsecureClient(unittest.TestCase): self.assertNotIn(client_result.type, found_client_op_types) found_client_op_types.add(client_result.type) if client_result.type == cygrpc.OperationType.receive_initial_metadata: - self.assertEqual(dict(server_initial_metadata), - dict(client_result.received_metadata)) + self.assertTrue( + test_common.metadata_transmitted(server_initial_metadata, + client_result.received_metadata)) elif client_result.type == cygrpc.OperationType.receive_message: self.assertEqual(RESPONSE, client_result.received_message.bytes()) elif client_result.type == cygrpc.OperationType.receive_status_on_client: - self.assertEqual(dict(server_trailing_metadata), - dict(client_result.received_metadata)) + self.assertTrue( + test_common.metadata_transmitted(server_trailing_metadata, + client_result.received_metadata)) self.assertEqual(SERVER_STATUS_DETAILS, client_result.received_status_details) self.assertEqual(SERVER_STATUS_CODE, client_result.received_status_code) diff --git a/src/python/grpcio_test/grpc_test/test_common.py b/src/python/grpcio_test/grpc_test/test_common.py index 44284be88b..29431bfb9d 100644 --- a/src/python/grpcio_test/grpc_test/test_common.py +++ b/src/python/grpcio_test/grpc_test/test_common.py @@ -46,19 +46,23 @@ def metadata_transmitted(original_metadata, transmitted_metadata): the same key. Args: - original_metadata: A metadata value used in a test of gRPC. + original_metadata: A metadata value used in a test of gRPC. An iterable over + iterables of length 2. transmitted_metadata: A metadata value corresponding to original_metadata - after having been transmitted via gRPC. + after having been transmitted via gRPC. An iterable over iterables of + length 2. Returns: A boolean indicating whether transmitted_metadata accurately reflects original_metadata after having been transmitted via gRPC. """ original = collections.defaultdict(list) - for key, value in original_metadata: + for key_value_pair in original_metadata: + key, value = tuple(key_value_pair) original[key].append(value) transmitted = collections.defaultdict(list) - for key, value in transmitted_metadata: + for key_value_pair in transmitted_metadata: + key, value = tuple(key_value_pair) transmitted[key].append(value) for key, values in original.iteritems(): diff --git a/src/python/grpcio_test/setup.cfg b/src/python/grpcio_test/setup.cfg index b32d3f5972..3be93cb918 100644 --- a/src/python/grpcio_test/setup.cfg +++ b/src/python/grpcio_test/setup.cfg @@ -1,3 +1,2 @@ [pytest] -norecursedirs = _cython python_files = *_test.py diff --git a/src/python/grpcio_test/setup.py b/src/python/grpcio_test/setup.py index fe36bc9232..0f43b4a638 100644 --- a/src/python/grpcio_test/setup.py +++ b/src/python/grpcio_test/setup.py @@ -40,7 +40,7 @@ os.chdir(os.path.dirname(os.path.abspath(__file__))) # Break import-style to ensure we can actually find our commands module. import commands -_PACKAGES = setuptools.find_packages('.', exclude=['*._cython', '*._cython.*']) +_PACKAGES = setuptools.find_packages('.') _PACKAGE_DIRECTORIES = { '': '.', diff --git a/src/ruby/.rubocop.yml b/src/ruby/.rubocop.yml index 312bdca384..dd57ab6082 100644 --- a/src/ruby/.rubocop.yml +++ b/src/ruby/.rubocop.yml @@ -9,3 +9,9 @@ AllCops: - 'bin/math_services.rb' - 'pb/grpc/health/v1alpha/*' - 'pb/test/**/*' + +Metrics/CyclomaticComplexity: + Max: 8 + +Metrics/PerceivedComplexity: + Max: 8 diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 3740ac52da..228c500672 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -416,6 +416,8 @@ module GRPC until stopped? begin an_rpc = @server.request_call(@cq, loop_tag, INFINITE_FUTURE) + break if (!an_rpc.nil?) && an_rpc.call.nil? + c = new_active_server_call(an_rpc) unless c.nil? mth = an_rpc.method.to_sym |