diff options
Diffstat (limited to 'src')
72 files changed, 1847 insertions, 1091 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index b09bf99677..f8c6fda340 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -140,7 +140,6 @@ grpc::string GetHeaderIncludes(grpc_generator::File *file, printer->Print(vars, "namespace grpc {\n"); printer->Print(vars, "class CompletionQueue;\n"); printer->Print(vars, "class Channel;\n"); - printer->Print(vars, "class RpcService;\n"); printer->Print(vars, "class ServerCompletionQueue;\n"); printer->Print(vars, "class ServerContext;\n"); printer->Print(vars, "} // namespace grpc\n\n"); @@ -187,19 +186,21 @@ void PrintHeaderClientMethodInterfaces( } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, - "std::unique_ptr< ::grpc::ClientWriterInterface< $Request$>>" + "std::unique_ptr< ::grpc::ClientWriterInterface< " + "$Request$>>" " $Method$(" "::grpc::ClientContext* context, $Response$* response) {\n"); printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< ::grpc::ClientWriterInterface< $Request$>>" - "($Method$Raw(context, response));\n"); + printer->Print(*vars, + "return std::unique_ptr< " + "::grpc::ClientWriterInterface< $Request$>>" + "($Method$Raw(context, response));\n"); printer->Outdent(); printer->Print("}\n"); printer->Print( *vars, - "std::unique_ptr< ::grpc::ClientAsyncWriterInterface< $Request$>>" + "std::unique_ptr< ::grpc::ClientAsyncWriterInterface< " + "$Request$>>" " Async$Method$(::grpc::ClientContext* context, $Response$* " "response, " "::grpc::CompletionQueue* cq, void* tag) {\n"); @@ -213,19 +214,21 @@ void PrintHeaderClientMethodInterfaces( } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, - "std::unique_ptr< ::grpc::ClientReaderInterface< $Response$>>" + "std::unique_ptr< ::grpc::ClientReaderInterface< " + "$Response$>>" " $Method$(::grpc::ClientContext* context, const $Request$& request)" " {\n"); printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< ::grpc::ClientReaderInterface< $Response$>>" - "($Method$Raw(context, request));\n"); + printer->Print(*vars, + "return std::unique_ptr< " + "::grpc::ClientReaderInterface< $Response$>>" + "($Method$Raw(context, request));\n"); printer->Outdent(); printer->Print("}\n"); printer->Print( *vars, - "std::unique_ptr< ::grpc::ClientAsyncReaderInterface< $Response$>> " + "std::unique_ptr< ::grpc::ClientAsyncReaderInterface< " + "$Response$>> " "Async$Method$(" "::grpc::ClientContext* context, const $Request$& request, " "::grpc::CompletionQueue* cq, void* tag) {\n"); @@ -242,36 +245,37 @@ void PrintHeaderClientMethodInterfaces( "$Request$, $Response$>> " "$Method$(::grpc::ClientContext* context) {\n"); printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< " - "::grpc::ClientReaderWriterInterface< $Request$, $Response$>>(" - "$Method$Raw(context));\n"); + printer->Print(*vars, + "return std::unique_ptr< " + "::grpc::ClientReaderWriterInterface< " + "$Request$, $Response$>>(" + "$Method$Raw(context));\n"); printer->Outdent(); printer->Print("}\n"); - printer->Print( - *vars, - "std::unique_ptr< " - "::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>> " - "Async$Method$(::grpc::ClientContext* context, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); + printer->Print(*vars, + "std::unique_ptr< " + "::grpc::ClientAsyncReaderWriterInterface< " + "$Request$, $Response$>> " + "Async$Method$(::grpc::ClientContext* context, " + "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>>(" - "Async$Method$Raw(context, cq, tag));\n"); + printer->Print(*vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncReaderWriterInterface< " + "$Request$, $Response$>>(" + "Async$Method$Raw(context, cq, tag));\n"); printer->Outdent(); printer->Print("}\n"); } } else { if (method->NoStreaming()) { - printer->Print( - *vars, - "virtual ::grpc::ClientAsyncResponseReaderInterface< $Response$>* " - "Async$Method$Raw(::grpc::ClientContext* context, " - "const $Request$& request, " - "::grpc::CompletionQueue* cq) = 0;\n"); + printer->Print(*vars, + "virtual " + "::grpc::ClientAsyncResponseReaderInterface< " + "$Response$>* " + "Async$Method$Raw(::grpc::ClientContext* context, " + "const $Request$& request, " + "::grpc::CompletionQueue* cq) = 0;\n"); } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, @@ -286,7 +290,8 @@ void PrintHeaderClientMethodInterfaces( } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, - "virtual ::grpc::ClientReaderInterface< $Response$>* $Method$Raw(" + "virtual ::grpc::ClientReaderInterface< $Response$>* " + "$Method$Raw(" "::grpc::ClientContext* context, const $Request$& request) = 0;\n"); printer->Print( *vars, @@ -451,7 +456,8 @@ void PrintHeaderClientMethodData(grpc_generator::Printer *printer, const grpc_generator::Method *method, std::map<grpc::string, grpc::string> *vars) { (*vars)["Method"] = method->name(); - printer->Print(*vars, "const ::grpc::RpcMethod rpcmethod_$Method$_;\n"); + printer->Print(*vars, + "const ::grpc::internal::RpcMethod rpcmethod_$Method$_;\n"); } void PrintHeaderServerMethodSync(grpc_generator::Printer *printer, @@ -623,7 +629,7 @@ void PrintHeaderServerMethodStreamedUnary( printer->Print(*vars, "WithStreamedUnaryMethod_$Method$() {\n" " ::grpc::Service::MarkMethodStreamed($Idx$,\n" - " new ::grpc::StreamedUnaryHandler< $Request$, " + " new ::grpc::internal::StreamedUnaryHandler< $Request$, " "$Response$>(std::bind" "(&WithStreamedUnaryMethod_$Method$<BaseClass>::" "Streamed$Method$, this, std::placeholders::_1, " @@ -671,15 +677,16 @@ void PrintHeaderServerMethodSplitStreaming( "{}\n"); printer->Print(" public:\n"); printer->Indent(); - printer->Print(*vars, - "WithSplitStreamingMethod_$Method$() {\n" - " ::grpc::Service::MarkMethodStreamed($Idx$,\n" - " new ::grpc::SplitServerStreamingHandler< $Request$, " - "$Response$>(std::bind" - "(&WithSplitStreamingMethod_$Method$<BaseClass>::" - "Streamed$Method$, this, std::placeholders::_1, " - "std::placeholders::_2)));\n" - "}\n"); + printer->Print( + *vars, + "WithSplitStreamingMethod_$Method$() {\n" + " ::grpc::Service::MarkMethodStreamed($Idx$,\n" + " new ::grpc::internal::SplitServerStreamingHandler< $Request$, " + "$Response$>(std::bind" + "(&WithSplitStreamingMethod_$Method$<BaseClass>::" + "Streamed$Method$, this, std::placeholders::_1, " + "std::placeholders::_2)));\n" + "}\n"); printer->Print(*vars, "~WithSplitStreamingMethod_$Method$() override {\n" " BaseClassMustBeDerivedFromService(this);\n" @@ -819,7 +826,8 @@ void PrintHeaderService(grpc_generator::Printer *printer, " {\n public:\n"); printer->Indent(); printer->Print( - "Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);\n"); + "Stub(const std::shared_ptr< ::grpc::ChannelInterface>& " + "channel);\n"); for (int i = 0; i < service->method_count(); ++i) { PrintHeaderClientMethod(printer, service->method(i).get(), vars, true); } @@ -1082,11 +1090,12 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::Status $ns$$Service$::Stub::$Method$(" "::grpc::ClientContext* context, " "const $Request$& request, $Response$* response) {\n"); - printer->Print(*vars, - " return ::grpc::BlockingUnaryCall(channel_.get(), " - "rpcmethod_$Method$_, " - "context, request, response);\n" - "}\n\n"); + printer->Print( + *vars, + " return ::grpc::internal::BlockingUnaryCall(channel_.get(), " + "rpcmethod_$Method$_, " + "context, request, response);\n" + "}\n\n"); printer->Print( *vars, "::grpc::ClientAsyncResponseReader< $Response$>* " @@ -1095,7 +1104,8 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::CompletionQueue* cq) {\n"); printer->Print(*vars, " return " - "::grpc::ClientAsyncResponseReader< $Response$>::Create(" + "::grpc::ClientAsyncResponseReader< $Response$>::" + "internal::Create(" "channel_.get(), cq, " "rpcmethod_$Method$_, " "context, request);\n" @@ -1105,19 +1115,21 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientWriter< $Request$>* " "$ns$$Service$::Stub::$Method$Raw(" "::grpc::ClientContext* context, $Response$* response) {\n"); - printer->Print(*vars, - " return new ::grpc::ClientWriter< $Request$>(" - "channel_.get(), " - "rpcmethod_$Method$_, " - "context, response);\n" - "}\n\n"); + printer->Print( + *vars, + " return ::grpc::ClientWriter< $Request$>::internal::Create(" + "channel_.get(), " + "rpcmethod_$Method$_, " + "context, response);\n" + "}\n\n"); printer->Print(*vars, "::grpc::ClientAsyncWriter< $Request$>* " "$ns$$Service$::Stub::Async$Method$Raw(" "::grpc::ClientContext* context, $Response$* response, " "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Print(*vars, - " return ::grpc::ClientAsyncWriter< $Request$>::Create(" + " return ::grpc::ClientAsyncWriter< $Request$>::" + "internal::Create(" "channel_.get(), cq, " "rpcmethod_$Method$_, " "context, response, tag);\n" @@ -1128,19 +1140,21 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientReader< $Response$>* " "$ns$$Service$::Stub::$Method$Raw(" "::grpc::ClientContext* context, const $Request$& request) {\n"); - printer->Print(*vars, - " return new ::grpc::ClientReader< $Response$>(" - "channel_.get(), " - "rpcmethod_$Method$_, " - "context, request);\n" - "}\n\n"); + printer->Print( + *vars, + " return ::grpc::ClientReader< $Response$>::internal::Create(" + "channel_.get(), " + "rpcmethod_$Method$_, " + "context, request);\n" + "}\n\n"); printer->Print(*vars, "::grpc::ClientAsyncReader< $Response$>* " "$ns$$Service$::Stub::Async$Method$Raw(" "::grpc::ClientContext* context, const $Request$& request, " "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Print(*vars, - " return ::grpc::ClientAsyncReader< $Response$>::Create(" + " return ::grpc::ClientAsyncReader< $Response$>::" + "internal::Create(" "channel_.get(), cq, " "rpcmethod_$Method$_, " "context, request, tag);\n" @@ -1151,8 +1165,8 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientReaderWriter< $Request$, $Response$>* " "$ns$$Service$::Stub::$Method$Raw(::grpc::ClientContext* context) {\n"); printer->Print(*vars, - " return new ::grpc::ClientReaderWriter< " - "$Request$, $Response$>(" + " return ::grpc::ClientReaderWriter< " + "$Request$, $Response$>::internal::Create(" "channel_.get(), " "rpcmethod_$Method$_, " "context);\n" @@ -1162,14 +1176,14 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " "$ns$$Service$::Stub::Async$Method$Raw(::grpc::ClientContext* context, " "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Print( - *vars, - " return " - "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::Create(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, tag);\n" - "}\n\n"); + printer->Print(*vars, + " return " + "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::" + "internal::Create(" + "channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, tag);\n" + "}\n\n"); } } @@ -1279,7 +1293,7 @@ void PrintSourceService(grpc_generator::Printer *printer, printer->Print(*vars, ", rpcmethod_$Method$_(" "$prefix$$Service$_method_names[$Idx$], " - "::grpc::RpcMethod::$StreamingType$, " + "::grpc::internal::RpcMethod::$StreamingType$, " "channel" ")\n"); } @@ -1302,38 +1316,38 @@ void PrintSourceService(grpc_generator::Printer *printer, if (method->NoStreaming()) { printer->Print( *vars, - "AddMethod(new ::grpc::RpcServiceMethod(\n" + "AddMethod(new ::grpc::internal::RpcServiceMethod(\n" " $prefix$$Service$_method_names[$Idx$],\n" - " ::grpc::RpcMethod::NORMAL_RPC,\n" - " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, " + " ::grpc::internal::RpcMethod::NORMAL_RPC,\n" + " new ::grpc::internal::RpcMethodHandler< $ns$$Service$::Service, " "$Request$, " "$Response$>(\n" " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (ClientOnlyStreaming(method.get())) { printer->Print( *vars, - "AddMethod(new ::grpc::RpcServiceMethod(\n" + "AddMethod(new ::grpc::internal::RpcServiceMethod(\n" " $prefix$$Service$_method_names[$Idx$],\n" - " ::grpc::RpcMethod::CLIENT_STREAMING,\n" - " new ::grpc::ClientStreamingHandler< " + " ::grpc::internal::RpcMethod::CLIENT_STREAMING,\n" + " new ::grpc::internal::ClientStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (ServerOnlyStreaming(method.get())) { printer->Print( *vars, - "AddMethod(new ::grpc::RpcServiceMethod(\n" + "AddMethod(new ::grpc::internal::RpcServiceMethod(\n" " $prefix$$Service$_method_names[$Idx$],\n" - " ::grpc::RpcMethod::SERVER_STREAMING,\n" - " new ::grpc::ServerStreamingHandler< " + " ::grpc::internal::RpcMethod::SERVER_STREAMING,\n" + " new ::grpc::internal::ServerStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (method->BidiStreaming()) { printer->Print( *vars, - "AddMethod(new ::grpc::RpcServiceMethod(\n" + "AddMethod(new ::grpc::internal::RpcServiceMethod(\n" " $prefix$$Service$_method_names[$Idx$],\n" - " ::grpc::RpcMethod::BIDI_STREAMING,\n" - " new ::grpc::BidiStreamingHandler< " + " ::grpc::internal::RpcMethod::BIDI_STREAMING,\n" + " new ::grpc::internal::BidiStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } @@ -1501,7 +1515,8 @@ void PrintMockClientMethods(grpc_generator::Printer *printer, printer->Print( *vars, "MOCK_METHOD3(Async$Method$Raw, " - "::grpc::ClientAsyncReaderWriterInterface<$Request$, $Response$>*" + "::grpc::ClientAsyncReaderWriterInterface<$Request$, " + "$Response$>*" "(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, " "void* tag));\n"); } diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index de516ab4c9..7add432589 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -52,6 +52,8 @@ /* Client channel implementation */ +grpc_tracer_flag grpc_client_channel_trace = GRPC_TRACER_INITIALIZER(false); + /************************************************************************* * METHOD-CONFIG TABLE */ @@ -241,6 +243,10 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, GRPC_ERROR_REF(error)); } } + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand, + grpc_connectivity_state_name(state)); + } grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error, reason); } @@ -251,6 +257,10 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, grpc_connectivity_state publish_state = w->state; /* check if the notification is for the latest policy */ if (w->lb_policy == w->chand->lb_policy) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand, + w->lb_policy, grpc_connectivity_state_name(w->state)); + } if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) { publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver); @@ -263,7 +273,6 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state); } } - GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy"); gpr_free(w); } @@ -273,7 +282,6 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, grpc_connectivity_state current_state) { lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); - w->chand = chand; GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w, grpc_combiner_scheduler(chand->combiner)); @@ -283,6 +291,18 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, &w->on_changed); } +static void start_resolving_locked(grpc_exec_ctx *exec_ctx, + channel_data *chand) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand); + } + GPR_ASSERT(!chand->started_resolving); + chand->started_resolving = true; + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); + grpc_resolver_next_locked(exec_ctx, chand->resolver, &chand->resolver_result, + &chand->on_resolver_result_changed); +} + typedef struct { char *server_name; grpc_server_retry_throttle_data *retry_throttle_data; @@ -345,8 +365,13 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { channel_data *chand = arg; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand, + grpc_error_string(error)); + } // Extract the following fields from the resolver result, if non-NULL. char *lb_policy_name = NULL; + bool lb_policy_name_changed = false; grpc_lb_policy *new_lb_policy = NULL; char *service_config_json = NULL; grpc_server_retry_throttle_data *retry_throttle_data = NULL; @@ -394,10 +419,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, // taking a lock on chand->info_mu, because this function is the // only thing that modifies its value, and it can only be invoked // once at any given time. - const bool lb_policy_type_changed = + lb_policy_name_changed = chand->info_lb_policy_name == NULL || strcmp(chand->info_lb_policy_name, lb_policy_name) != 0; - if (chand->lb_policy != NULL && !lb_policy_type_changed) { + if (chand->lb_policy != NULL && !lb_policy_name_changed) { // Continue using the same LB policy. Update with new addresses. grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args); } else { @@ -445,6 +470,13 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_channel_args_destroy(exec_ctx, chand->resolver_result); chand->resolver_result = NULL; } + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p: resolver result: lb_policy_name=\"%s\"%s, " + "service_config=\"%s\"", + chand, lb_policy_name, lb_policy_name_changed ? " (changed)" : "", + service_config_json); + } // Now swap out fields in chand. Note that the new values may still // be NULL if (e.g.) the resolver failed to return results or the // results did not contain the necessary data. @@ -479,6 +511,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, if (new_lb_policy != NULL || error != GRPC_ERROR_NONE || chand->resolver == NULL) { if (chand->lb_policy != NULL) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand, + chand->lb_policy); + } grpc_pollset_set_del_pollset_set(exec_ctx, chand->lb_policy->interested_parties, chand->interested_parties); @@ -489,7 +525,13 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, // Now that we've swapped out the relevant fields of chand, check for // error or shutdown. if (error != GRPC_ERROR_NONE || chand->resolver == NULL) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: shutting down", chand); + } if (chand->resolver != NULL) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand); + } grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; @@ -510,6 +552,9 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_error *state_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); if (new_lb_policy != NULL) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand); + } GRPC_ERROR_UNREF(state_error); state = grpc_lb_policy_check_connectivity_locked(exec_ctx, new_lb_policy, &state_error); @@ -772,7 +817,9 @@ typedef struct client_channel_call_data { gpr_atm subchannel_call_or_error; gpr_arena *arena; - bool pick_pending; + grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending. + grpc_closure lb_pick_closure; + grpc_connected_subchannel *connected_subchannel; grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; grpc_polling_entity *pollent; @@ -837,8 +884,15 @@ static void waiting_for_pick_batches_add_locked( } static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx, - call_data *calld, + grpc_call_element *elem, grpc_error *error) { + call_data *calld = elem->call_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s", + elem->channel_data, calld, calld->waiting_for_pick_batches_count, + grpc_error_string(error)); + } for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) { grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, calld->waiting_for_pick_batches[i], GRPC_ERROR_REF(error)); @@ -848,14 +902,21 @@ static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx, } static void waiting_for_pick_batches_resume_locked(grpc_exec_ctx *exec_ctx, - call_data *calld) { + grpc_call_element *elem) { + call_data *calld = elem->call_data; if (calld->waiting_for_pick_batches_count == 0) return; call_or_error coe = get_call_or_error(calld); if (coe.error != GRPC_ERROR_NONE) { - waiting_for_pick_batches_fail_locked(exec_ctx, calld, + waiting_for_pick_batches_fail_locked(exec_ctx, elem, GRPC_ERROR_REF(coe.error)); return; } + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR + " pending batches to subchannel_call=%p", + elem->channel_data, calld, calld->waiting_for_pick_batches_count, + coe.subchannel_call); + } for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) { grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, calld->waiting_for_pick_batches[i]); @@ -869,6 +930,10 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call", + chand, calld); + } if (chand->retry_throttle_data != NULL) { calld->retry_throttle_data = grpc_server_retry_throttle_data_ref(chand->retry_throttle_data); @@ -895,7 +960,9 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, } static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, - call_data *calld, grpc_error *error) { + grpc_call_element *elem, + grpc_error *error) { + call_data *calld = elem->call_data; grpc_subchannel_call *subchannel_call = NULL; const grpc_connected_subchannel_call_args call_args = { .pollent = calld->pollent, @@ -906,13 +973,18 @@ static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, .context = calld->subchannel_call_context}; grpc_error *new_error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", + elem->channel_data, calld, subchannel_call, + grpc_error_string(new_error)); + } GPR_ASSERT(set_call_or_error( calld, (call_or_error){.subchannel_call = subchannel_call})); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); - waiting_for_pick_batches_fail_locked(exec_ctx, calld, new_error); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, new_error); } else { - waiting_for_pick_batches_resume_locked(exec_ctx, calld); + waiting_for_pick_batches_resume_locked(exec_ctx, elem); } GRPC_ERROR_UNREF(error); } @@ -922,8 +994,6 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, grpc_error *error) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - GPR_ASSERT(calld->pick_pending); - calld->pick_pending = false; grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); call_or_error coe = get_call_or_error(calld); @@ -935,8 +1005,13 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, "Call dropped by load balancing policy") : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to create subchannel", &error, 1); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: failed to create subchannel: error=%s", chand, + calld, grpc_error_string(failure)); + } set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)}); - waiting_for_pick_batches_fail_locked(exec_ctx, calld, failure); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, failure); } else if (coe.error != GRPC_ERROR_NONE) { /* already cancelled before subchannel became ready */ grpc_error *child_errors[] = {error, coe.error}; @@ -950,10 +1025,15 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED); } - waiting_for_pick_batches_fail_locked(exec_ctx, calld, cancellation_error); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: cancelled before subchannel became ready: %s", + chand, calld, grpc_error_string(cancellation_error)); + } + waiting_for_pick_batches_fail_locked(exec_ctx, elem, cancellation_error); } else { /* Create call on subchannel. */ - create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); + create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); } GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); GRPC_ERROR_UNREF(error); @@ -983,41 +1063,77 @@ typedef struct { grpc_closure closure; } pick_after_resolver_result_args; -static void continue_picking_after_resolver_result_locked( - grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { +static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error) { pick_after_resolver_result_args *args = arg; if (args->cancelled) { /* cancelled, do nothing */ - } else if (error != GRPC_ERROR_NONE) { - subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error)); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "call cancelled before resolver result"); + } } else { - if (pick_subchannel_locked(exec_ctx, args->elem)) { - subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE); + channel_data *chand = args->elem->channel_data; + call_data *calld = args->elem->call_data; + if (error != GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data", + chand, calld); + } + subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error)); + } else { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick", + chand, calld); + } + if (pick_subchannel_locked(exec_ctx, args->elem)) { + subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE); + } } } gpr_free(args); } -static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_error *error) { +static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: deferring pick pending resolver result", chand, + calld); + } + pick_after_resolver_result_args *args = + (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args)); + args->elem = elem; + GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked, + args, grpc_combiner_scheduler(chand->combiner)); + grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, + &args->closure, GRPC_ERROR_NONE); +} + +static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_error *error) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - if (chand->lb_policy != NULL) { - grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy, - &calld->connected_subchannel, - GRPC_ERROR_REF(error)); - } // If we don't yet have a resolver result, then a closure for - // continue_picking_after_resolver_result_locked() will have been added to + // pick_after_resolver_result_done_locked() will have been added to // chand->waiting_for_resolver_result_closures, and it may not be invoked // until after this call has been destroyed. We mark the operation as - // cancelled, so that when continue_picking_after_resolver_result_locked() + // cancelled, so that when pick_after_resolver_result_done_locked() // is called, it will be a no-op. We also immediately invoke // subchannel_ready_locked() to propagate the error back to the caller. for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head; closure != NULL; closure = closure->next_data.next) { pick_after_resolver_result_args *args = closure->cb_arg; if (!args->cancelled && args->elem == elem) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: " + "cancelling pick waiting for resolver result", + chand, calld); + } args->cancelled = true; subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( @@ -1027,24 +1143,21 @@ static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GRPC_ERROR_UNREF(error); } -// State for pick callback that holds a reference to the LB policy -// from which the pick was requested. -typedef struct { - grpc_lb_policy *lb_policy; - grpc_call_element *elem; - grpc_closure closure; -} pick_callback_args; - // Callback invoked by grpc_lb_policy_pick_locked() for async picks. // Unrefs the LB policy after invoking subchannel_ready_locked(). static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - pick_callback_args *args = arg; - GPR_ASSERT(args != NULL); - GPR_ASSERT(args->lb_policy != NULL); - subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error)); - GRPC_LB_POLICY_UNREF(exec_ctx, args->lb_policy, "pick_subchannel"); - gpr_free(args); + grpc_call_element *elem = arg; + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously", + chand, calld); + } + GPR_ASSERT(calld->lb_policy != NULL); + GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel"); + calld->lb_policy = NULL; + subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); } // Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked(). @@ -1055,23 +1168,44 @@ static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx, const grpc_lb_policy_pick_args *inputs) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - pick_callback_args *pick_args = gpr_zalloc(sizeof(*pick_args)); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p", + chand, calld, chand->lb_policy); + } + // Keep a ref to the LB policy in calld while the pick is pending. GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel"); - pick_args->lb_policy = chand->lb_policy; - pick_args->elem = elem; - GRPC_CLOSURE_INIT(&pick_args->closure, pick_callback_done_locked, pick_args, + calld->lb_policy = chand->lb_policy; + GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem, grpc_combiner_scheduler(chand->combiner)); const bool pick_done = grpc_lb_policy_pick_locked( exec_ctx, chand->lb_policy, inputs, &calld->connected_subchannel, - calld->subchannel_call_context, NULL, &pick_args->closure); + calld->subchannel_call_context, NULL, &calld->lb_pick_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ - GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "pick_subchannel"); - gpr_free(pick_args); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously", + chand, calld); + } + GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel"); + calld->lb_policy = NULL; } return pick_done; } +static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_error *error) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + GPR_ASSERT(calld->lb_policy != NULL); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", + chand, calld, calld->lb_policy); + } + grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy, + &calld->connected_subchannel, error); +} + static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { GPR_TIMER_BEGIN("pick_subchannel", 0); @@ -1107,20 +1241,9 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx, pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs); } else if (chand->resolver != NULL) { if (!chand->started_resolving) { - chand->started_resolving = true; - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next_locked(exec_ctx, chand->resolver, - &chand->resolver_result, - &chand->on_resolver_result_changed); + start_resolving_locked(exec_ctx, chand); } - pick_after_resolver_result_args *args = - (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args)); - args->elem = elem; - GRPC_CLOSURE_INIT(&args->closure, - continue_picking_after_resolver_result_locked, args, - grpc_combiner_scheduler(chand->combiner)); - grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, - &args->closure, GRPC_ERROR_NONE); + pick_after_resolver_result_start_locked(exec_ctx, elem); } else { subchannel_ready_locked( exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); @@ -1133,63 +1256,77 @@ static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_ignored) { GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0); - grpc_transport_stream_op_batch *op = arg; - grpc_call_element *elem = op->handler_private.extra_arg; + grpc_transport_stream_op_batch *batch = arg; + grpc_call_element *elem = batch->handler_private.extra_arg; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; /* need to recheck that another thread hasn't set the call */ call_or_error coe = get_call_or_error(calld); if (coe.error != GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s", + chand, calld, grpc_error_string(coe.error)); + } grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(coe.error)); + exec_ctx, batch, GRPC_ERROR_REF(coe.error)); goto done; } if (coe.subchannel_call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: sending batch to subchannel_call=%p", chand, + calld, coe.subchannel_call); + } + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch); goto done; } // Add to waiting-for-pick list. If we succeed in getting a // subchannel call below, we'll handle this batch (along with any // other waiting batches) in waiting_for_pick_batches_resume_locked(). - waiting_for_pick_batches_add_locked(calld, op); - /* if this is a cancellation, then we can raise our cancelled flag */ - if (op->cancel_stream) { - grpc_error *error = op->payload->cancel_stream.cancel_error; + waiting_for_pick_batches_add_locked(calld, batch); + // If this is a cancellation, cancel the pending pick (if any) and + // fail any pending batches. + if (batch->cancel_stream) { + grpc_error *error = batch->payload->cancel_stream.cancel_error; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand, + calld, grpc_error_string(error)); + } /* Stash a copy of cancel_error in our call data, so that we can use it for subsequent operations. This ensures that if the call is - cancelled before any ops are passed down (e.g., if the deadline + cancelled before any batches are passed down (e.g., if the deadline is in the past when the call starts), we can return the right - error to the caller when the first op does get passed down. */ + error to the caller when the first batch does get passed down. */ set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)}); - if (calld->pick_pending) { - cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); + if (calld->lb_policy != NULL) { + pick_callback_cancel_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); + } else { + pick_after_resolver_result_cancel_locked(exec_ctx, elem, + GRPC_ERROR_REF(error)); } - waiting_for_pick_batches_fail_locked(exec_ctx, calld, - GRPC_ERROR_REF(error)); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); goto done; } /* if we don't have a subchannel, try to get one */ - if (!calld->pick_pending && calld->connected_subchannel == NULL && - op->send_initial_metadata) { - calld->initial_metadata_payload = op->payload; - calld->pick_pending = true; + if (batch->send_initial_metadata) { + GPR_ASSERT(calld->connected_subchannel == NULL); + calld->initial_metadata_payload = batch->payload; GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); /* If a subchannel is not available immediately, the polling entity from call_data should be provided to channel_data's interested_parties, so that IO of the lb_policy and resolver could be done under it. */ if (pick_subchannel_locked(exec_ctx, elem)) { // Pick was returned synchronously. - calld->pick_pending = false; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); if (calld->connected_subchannel == NULL) { grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Call dropped by load balancing policy"); set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)}); - waiting_for_pick_batches_fail_locked(exec_ctx, calld, error); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, error); } else { // Create subchannel call. - create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_NONE); + create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_NONE); } } else { grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, @@ -1232,47 +1369,59 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { If it has, we proceed on the fast path. */ static void cc_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { + grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + if (GRPC_TRACER_ON(grpc_client_channel_trace) || + GRPC_TRACER_ON(grpc_trace_channel)) { + grpc_call_log_op(GPR_INFO, elem, batch); + } if (chand->deadline_checking_enabled) { grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, - op); + batch); } // Intercept on_complete for recv_trailing_metadata so that we can // check retry throttle status. - if (op->recv_trailing_metadata) { - GPR_ASSERT(op->on_complete != NULL); - calld->original_on_complete = op->on_complete; + if (batch->recv_trailing_metadata) { + GPR_ASSERT(batch->on_complete != NULL); + calld->original_on_complete = batch->on_complete; GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem, grpc_schedule_on_exec_ctx); - op->on_complete = &calld->on_complete; + batch->on_complete = &calld->on_complete; } /* try to (atomically) get the call */ call_or_error coe = get_call_or_error(calld); GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0); if (coe.error != GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s", + chand, calld, grpc_error_string(coe.error)); + } grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(coe.error)); - GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); - /* early out */ - return; + exec_ctx, batch, GRPC_ERROR_REF(coe.error)); + goto done; } if (coe.subchannel_call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); - GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); - /* early out */ - return; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: sending batch to subchannel_call=%p", chand, + calld, coe.subchannel_call); + } + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch); + goto done; } /* we failed; lock and figure out what to do */ + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering combiner", chand, calld); + } GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch"); - op->handler_private.extra_arg = elem; + batch->handler_private.extra_arg = elem; GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_INIT(&op->handler_private.closure, - start_transport_stream_op_batch_locked, op, + exec_ctx, GRPC_CLOSURE_INIT(&batch->handler_private.closure, + start_transport_stream_op_batch_locked, batch, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); +done: GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); } @@ -1317,7 +1466,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call, "client_channel_destroy_call"); } - GPR_ASSERT(!calld->pick_pending); + GPR_ASSERT(calld->lb_policy == NULL); GPR_ASSERT(calld->waiting_for_pick_batches_count == 0); if (calld->connected_subchannel != NULL) { GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel, @@ -1366,11 +1515,7 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, } else { chand->exit_idle_when_lb_policy_arrives = true; if (!chand->started_resolving && chand->resolver != NULL) { - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - chand->started_resolving = true; - grpc_resolver_next_locked(exec_ctx, chand->resolver, - &chand->resolver_result, - &chand->on_resolver_result_changed); + start_resolving_locked(exec_ctx, chand); } } GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect"); diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 63f7c29940..c99f0092e9 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -23,6 +23,8 @@ #include "src/core/ext/filters/client_channel/resolver.h" #include "src/core/lib/channel/channel_stack.h" +extern grpc_tracer_flag grpc_client_channel_trace; + // Channel arg key for server URI string. #define GRPC_ARG_SERVER_URI "grpc.server_uri" diff --git a/src/core/ext/filters/client_channel/client_channel_plugin.c b/src/core/ext/filters/client_channel/client_channel_plugin.c index 60e77d6268..6f133a648b 100644 --- a/src/core/ext/filters/client_channel/client_channel_plugin.c +++ b/src/core/ext/filters/client_channel/client_channel_plugin.c @@ -78,6 +78,7 @@ void grpc_client_channel_init(void) { GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, append_filter, (void *)&grpc_client_channel_filter); grpc_http_connect_register_handshaker_factory(); + grpc_register_tracer("client_channel", &grpc_client_channel_trace); #ifndef NDEBUG grpc_register_tracer("resolver_refcount", &grpc_trace_resolver_refcount); #endif diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index 8e9d6b0f47..fbef79ec31 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -158,6 +158,7 @@ static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx, if (sd->user_data != NULL) { GPR_ASSERT(sd->user_data_vtable != NULL); sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + sd->user_data = NULL; } } gpr_free(subchannel_list->subchannels); @@ -195,11 +196,28 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx, static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx, rr_subchannel_list *subchannel_list, const char *reason) { + if (subchannel_list->shutting_down) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "Subchannel list %p already shutting down", + (void *)subchannel_list); + } + return; + }; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "Shutting down subchannel_list %p", + (void *)subchannel_list); + } GPR_ASSERT(!subchannel_list->shutting_down); subchannel_list->shutting_down = true; for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { subchannel_data *sd = &subchannel_list->subchannels[i]; if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe. + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, + "Unsubscribing from subchannel %p as part of shutting down " + "subchannel_list %p", + (void *)sd->subchannel, (void *)subchannel_list); + } grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, &sd->connectivity_changed_closure); @@ -228,13 +246,14 @@ static size_t get_next_ready_subchannel_index_locked( const size_t index = (i + p->last_ready_subchannel_index + 1) % p->subchannel_list->num_subchannels; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: " - "state=%d", - (void *)p, - (void *)p->subchannel_list->subchannels[index].subchannel, - (void *)p->subchannel_list, (unsigned long)index, - p->subchannel_list->subchannels[index].curr_connectivity_state); + gpr_log( + GPR_DEBUG, + "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: " + "state=%s", + (void *)p, (void *)p->subchannel_list->subchannels[index].subchannel, + (void *)p->subchannel_list, (unsigned long)index, + grpc_connectivity_state_name( + p->subchannel_list->subchannels[index].curr_connectivity_state)); } if (p->subchannel_list->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) { @@ -511,16 +530,27 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { subchannel_data *sd = arg; round_robin_lb_policy *p = sd->subchannel_list->policy; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log( + GPR_DEBUG, + "[RR %p] connectivity changed for subchannel %p, subchannel_list %p: " + "prev_state=%s new_state=%s p->shutdown=%d " + "sd->subchannel_list->shutting_down=%d error=%s", + (void *)p, (void *)sd->subchannel, (void *)sd->subchannel_list, + grpc_connectivity_state_name(sd->prev_connectivity_state), + grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), + p->shutdown, sd->subchannel_list->shutting_down, + grpc_error_string(error)); + } // If the policy is shutting down, unref and return. if (p->shutdown) { rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown"); return; } - if (sd->subchannel_list->shutting_down) { + if (sd->subchannel_list->shutting_down && error == GRPC_ERROR_CANCELLED) { // the subchannel list associated with sd has been discarded. This callback // corresponds to the unsubscription. - GPR_ASSERT(error == GRPC_ERROR_CANCELLED); rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown"); return; @@ -536,13 +566,6 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[RR %p] connectivity changed for subchannel %p: " - "prev_state=%d new_state=%d", - (void *)p, (void *)sd->subchannel, sd->prev_connectivity_state, - sd->curr_connectivity_state); - } // Update state counters and determine new overall state. update_state_counters_locked(sd); sd->prev_connectivity_state = sd->curr_connectivity_state; @@ -556,6 +579,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, if (sd->user_data != NULL) { GPR_ASSERT(sd->user_data_vtable != NULL); sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + sd->user_data = NULL; } if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { /* the policy is shutting down. Flush all the pending picks... */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c index af3391a731..5ea75f0554 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c @@ -132,7 +132,7 @@ static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, static void dns_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - dns_resolver *r = arg; + dns_resolver *r = (dns_resolver *)arg; r->have_retry_timer = false; if (error == GRPC_ERROR_NONE) { @@ -146,7 +146,7 @@ static void dns_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - dns_resolver *r = arg; + dns_resolver *r = (dns_resolver *)arg; grpc_channel_args *result = NULL; GPR_ASSERT(r->resolving); r->resolving = false; @@ -241,7 +241,7 @@ static grpc_resolver *dns_create(grpc_exec_ctx *exec_ctx, char *path = args->uri->path; if (path[0] == '/') ++path; // Create resolver. - dns_resolver *r = gpr_zalloc(sizeof(dns_resolver)); + dns_resolver *r = (dns_resolver *)gpr_zalloc(sizeof(dns_resolver)); grpc_resolver_init(&r->base, &dns_resolver_vtable, args->combiner); r->name_to_resolve = gpr_strdup(path); r->default_port = gpr_strdup(default_port); diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c index 7b4fe38272..7ceb8f40a1 100644 --- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c @@ -177,7 +177,8 @@ static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx, return NULL; } /* Instantiate resolver. */ - sockaddr_resolver *r = gpr_zalloc(sizeof(sockaddr_resolver)); + sockaddr_resolver *r = + (sockaddr_resolver *)gpr_zalloc(sizeof(sockaddr_resolver)); r->addresses = addresses; r->channel_args = grpc_channel_args_copy(args->args); grpc_resolver_init(&r->base, &sockaddr_resolver_vtable, args->combiner); diff --git a/src/core/ext/filters/deadline/deadline_filter.c b/src/core/ext/filters/deadline/deadline_filter.c index ced025e2e2..6789903c95 100644 --- a/src/core/ext/filters/deadline/deadline_filter.c +++ b/src/core/ext/filters/deadline/deadline_filter.c @@ -37,8 +37,8 @@ // Timer callback. static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - grpc_call_element* elem = arg; - grpc_deadline_state* deadline_state = elem->call_data; + grpc_call_element* elem = (grpc_call_element*)arg; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; if (error != GRPC_ERROR_CANCELLED) { grpc_call_element_signal_error( exec_ctx, elem, @@ -57,7 +57,7 @@ static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) == 0) { return; } - grpc_deadline_state* deadline_state = elem->call_data; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; grpc_deadline_timer_state cur_state; grpc_closure* closure = NULL; retry: @@ -112,7 +112,7 @@ static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, // Callback run when the call is complete. static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - grpc_deadline_state* deadline_state = arg; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)arg; cancel_timer_if_needed(exec_ctx, deadline_state); // Invoke the next callback. GRPC_CLOSURE_RUN(exec_ctx, deadline_state->next_on_complete, @@ -145,7 +145,7 @@ static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_call_stack* call_stack, gpr_timespec deadline) { - grpc_deadline_state* deadline_state = elem->call_data; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; deadline_state->call_stack = call_stack; // Deadline will always be infinite on servers, so the timer will only be // set on clients with a finite deadline. @@ -169,13 +169,13 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, grpc_call_element* elem) { - grpc_deadline_state* deadline_state = elem->call_data; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; cancel_timer_if_needed(exec_ctx, deadline_state); } void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, gpr_timespec new_deadline) { - grpc_deadline_state* deadline_state = elem->call_data; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; cancel_timer_if_needed(exec_ctx, deadline_state); start_timer_if_needed(exec_ctx, elem, new_deadline); } @@ -183,7 +183,7 @@ void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, void grpc_deadline_state_client_start_transport_stream_op_batch( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op_batch* op) { - grpc_deadline_state* deadline_state = elem->call_data; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; if (op->cancel_stream) { cancel_timer_if_needed(exec_ctx, deadline_state); } else { @@ -256,8 +256,8 @@ static void client_start_transport_stream_op_batch( // Callback for receiving initial metadata on the server. static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - grpc_call_element* elem = arg; - server_call_data* calld = elem->call_data; + grpc_call_element* elem = (grpc_call_element*)arg; + server_call_data* calld = (server_call_data*)elem->call_data; // Get deadline from metadata and start the timer if needed. start_timer_if_needed(exec_ctx, elem, calld->recv_initial_metadata->deadline); // Invoke the next callback. @@ -269,7 +269,7 @@ static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg, static void server_start_transport_stream_op_batch( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op_batch* op) { - server_call_data* calld = elem->call_data; + server_call_data* calld = (server_call_data*)elem->call_data; if (op->cancel_stream) { cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state); } else { @@ -341,8 +341,8 @@ static bool maybe_add_deadline_filter(grpc_exec_ctx* exec_ctx, void* arg) { return grpc_deadline_checking_enabled( grpc_channel_stack_builder_get_channel_arguments(builder)) - ? grpc_channel_stack_builder_prepend_filter(builder, arg, NULL, - NULL) + ? grpc_channel_stack_builder_prepend_filter( + builder, (const grpc_channel_filter*)arg, NULL, NULL) : true; } diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c index 35304f8150..7d748b9c32 100644 --- a/src/core/ext/filters/max_age/max_age_filter.c +++ b/src/core/ext/filters/max_age/max_age_filter.c @@ -108,7 +108,7 @@ static void decrease_call_count(grpc_exec_ctx* exec_ctx, channel_data* chand) { static void start_max_idle_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; /* Decrease call_count. If there are no active calls at this time, max_idle_timer will start here. If the number of active calls is not 0, max_idle_timer will start after all the active calls end. */ @@ -119,7 +119,7 @@ static void start_max_idle_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, static void start_max_age_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; gpr_mu_lock(&chand->max_age_timer_mu); chand->max_age_timer_pending = true; GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer"); @@ -140,7 +140,7 @@ static void start_max_age_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, static void start_max_age_grace_timer_after_goaway_op(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; gpr_mu_lock(&chand->max_age_timer_mu); chand->max_age_grace_timer_pending = true; GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer"); @@ -156,7 +156,7 @@ static void start_max_age_grace_timer_after_goaway_op(grpc_exec_ctx* exec_ctx, static void close_max_idle_channel(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; if (error == GRPC_ERROR_NONE) { /* Prevent the max idle timer from being set again */ gpr_atm_no_barrier_fetch_add(&chand->call_count, 1); @@ -176,7 +176,7 @@ static void close_max_idle_channel(grpc_exec_ctx* exec_ctx, void* arg, static void close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; gpr_mu_lock(&chand->max_age_timer_mu); chand->max_age_timer_pending = false; gpr_mu_unlock(&chand->max_age_timer_mu); @@ -200,7 +200,7 @@ static void close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg, static void force_close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; gpr_mu_lock(&chand->max_age_timer_mu); chand->max_age_grace_timer_pending = false; gpr_mu_unlock(&chand->max_age_timer_mu); @@ -220,7 +220,7 @@ static void force_close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg, static void channel_connectivity_changed(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { grpc_transport_op* op = grpc_make_transport_op(NULL); op->on_connectivity_state_change = &chand->channel_connectivity_changed, @@ -264,7 +264,7 @@ static int add_random_max_connection_age_jitter(int value) { static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_element_args* args) { - channel_data* chand = elem->channel_data; + channel_data* chand = (channel_data*)elem->channel_data; increase_call_count(exec_ctx, chand); return GRPC_ERROR_NONE; } @@ -281,7 +281,7 @@ static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, grpc_channel_element_args* args) { - channel_data* chand = elem->channel_data; + channel_data* chand = (channel_data*)elem->channel_data; gpr_mu_init(&chand->max_age_timer_mu); chand->max_age_timer_pending = false; chand->max_age_grace_timer_pending = false; diff --git a/src/core/ext/filters/message_size/message_size_filter.c b/src/core/ext/filters/message_size/message_size_filter.c index 9bb565ed6d..846c7df69a 100644 --- a/src/core/ext/filters/message_size/message_size_filter.c +++ b/src/core/ext/filters/message_size/message_size_filter.c @@ -60,7 +60,8 @@ static void* message_size_limits_create_from_json(const grpc_json* json) { if (max_response_message_bytes == -1) return NULL; } } - message_size_limits* value = gpr_malloc(sizeof(message_size_limits)); + message_size_limits* value = + (message_size_limits*)gpr_malloc(sizeof(message_size_limits)); value->max_send_size = max_request_message_bytes; value->max_recv_size = max_response_message_bytes; return value; @@ -88,8 +89,8 @@ typedef struct channel_data { // receive message size. static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, grpc_error* error) { - grpc_call_element* elem = user_data; - call_data* calld = elem->call_data; + grpc_call_element* elem = (grpc_call_element*)user_data; + call_data* calld = (call_data*)elem->call_data; if (*calld->recv_message != NULL && calld->limits.max_recv_size >= 0 && (*calld->recv_message)->length > (size_t)calld->limits.max_recv_size) { char* message_string; @@ -117,7 +118,7 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, static void start_transport_stream_op_batch( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op_batch* op) { - call_data* calld = elem->call_data; + call_data* calld = (call_data*)elem->call_data; // Check max send message size. if (op->send_message && calld->limits.max_send_size >= 0 && op->payload->send_message.send_message->length > @@ -149,8 +150,8 @@ static void start_transport_stream_op_batch( static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_element_args* args) { - channel_data* chand = elem->channel_data; - call_data* calld = elem->call_data; + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; calld->next_recv_message_ready = NULL; GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, grpc_schedule_on_exec_ctx); @@ -160,8 +161,9 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, // size to the receive limit. calld->limits = chand->limits; if (chand->method_limit_table != NULL) { - message_size_limits* limits = grpc_method_config_table_get( - exec_ctx, chand->method_limit_table, args->path); + message_size_limits* limits = + (message_size_limits*)grpc_method_config_table_get( + exec_ctx, chand->method_limit_table, args->path); if (limits != NULL) { if (limits->max_send_size >= 0 && (limits->max_send_size < calld->limits.max_send_size || @@ -220,7 +222,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, grpc_channel_element_args* args) { GPR_ASSERT(!args->is_last); - channel_data* chand = elem->channel_data; + channel_data* chand = (channel_data*)elem->channel_data; chand->limits = get_message_size_limits(args->channel_args); // Get method config table from channel args. const grpc_arg* channel_arg = @@ -243,7 +245,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, // Destructor for channel_data. static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem) { - channel_data* chand = elem->channel_data; + channel_data* chand = (channel_data*)elem->channel_data; grpc_slice_hash_table_unref(exec_ctx, chand->method_limit_table); } diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c index 8b3fff5fa3..b4d2cb4b8c 100644 --- a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c +++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c @@ -52,8 +52,8 @@ static bool get_user_agent_mdelem(const grpc_metadata_batch* batch, // Callback invoked when we receive an initial metadata. static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* user_data, grpc_error* error) { - grpc_call_element* elem = user_data; - call_data* calld = elem->call_data; + grpc_call_element* elem = (grpc_call_element*)user_data; + call_data* calld = (call_data*)elem->call_data; if (GRPC_ERROR_NONE == error) { grpc_mdelem md; @@ -75,7 +75,7 @@ static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, static void start_transport_stream_op_batch( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op_batch* op) { - call_data* calld = elem->call_data; + call_data* calld = (call_data*)elem->call_data; // Inject callback for receiving initial metadata if (op->recv_initial_metadata) { @@ -103,7 +103,7 @@ static void start_transport_stream_op_batch( static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_element_args* args) { - call_data* calld = elem->call_data; + call_data* calld = (call_data*)elem->call_data; calld->next_recv_initial_metadata_ready = NULL; calld->workaround_active = false; GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, diff --git a/src/core/ext/filters/workarounds/workaround_utils.c b/src/core/ext/filters/workarounds/workaround_utils.c index bc76753a8a..e600fbee67 100644 --- a/src/core/ext/filters/workarounds/workaround_utils.c +++ b/src/core/ext/filters/workarounds/workaround_utils.c @@ -33,7 +33,8 @@ grpc_workaround_user_agent_md *grpc_parse_user_agent(grpc_mdelem md) { if (NULL != user_agent_md) { return user_agent_md; } - user_agent_md = gpr_malloc(sizeof(grpc_workaround_user_agent_md)); + user_agent_md = (grpc_workaround_user_agent_md *)gpr_malloc( + sizeof(grpc_workaround_user_agent_md)); for (int i = 0; i < GRPC_MAX_WORKAROUND_ID; i++) { if (ua_parser[i]) { user_agent_md->workaround_active[i] = ua_parser[i](md); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 6e8eadf7a1..f790267944 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -34,6 +34,7 @@ #include "src/core/ext/transport/chttp2/transport/varint.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" @@ -280,8 +281,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_slice_buffer_init(&t->outbuf); grpc_chttp2_hpack_compressor_init(&t->hpack_compressor); - GRPC_CLOSURE_INIT(&t->write_action, write_action, t, - grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t, @@ -399,6 +398,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } t->keepalive_permit_without_calls = g_default_keepalive_permit_without_calls; + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + if (channel_args) { for (i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, @@ -487,6 +488,23 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->keepalive_permit_without_calls = (uint32_t)grpc_channel_arg_get_integer( &channel_args->args[i], (grpc_integer_options){0, 0, 1}); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_OPTIMIZATION_TARGET)) { + if (channel_args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "%s should be a string", + GRPC_ARG_OPTIMIZATION_TARGET); + } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + } else if (0 == + strcmp(channel_args->args[i].value.string, "throughput")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT; + } else { + gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'", + GRPC_ARG_OPTIMIZATION_TARGET, + channel_args->args[i].value.string); + } } else { static const struct { const char *channel_arg_name; @@ -540,6 +558,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } + GRPC_CLOSURE_INIT(&t->write_action, write_action, t, + t->opt_target == GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT + ? grpc_executor_scheduler + : grpc_schedule_on_exec_ctx); + t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; t->ping_state.is_delayed_ping_timer_set = false; @@ -1464,6 +1487,21 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; + if (!t->is_client) { + if (op->send_initial_metadata) { + gpr_timespec deadline = + op->payload->send_initial_metadata.send_initial_metadata->deadline; + GPR_ASSERT(0 == + gpr_time_cmp(gpr_inf_future(deadline.clock_type), deadline)); + } + if (op->send_trailing_metadata) { + gpr_timespec deadline = + op->payload->send_trailing_metadata.send_trailing_metadata->deadline; + GPR_ASSERT(0 == + gpr_time_cmp(gpr_inf_future(deadline.clock_type), deadline)); + } + } + if (GRPC_TRACER_ON(grpc_http_trace)) { char *str = grpc_transport_stream_op_batch_string(op); gpr_log(GPR_DEBUG, "perform_stream_op[s=%p]: %s", s, str); @@ -2129,8 +2167,8 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void update_frame(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, double bw_dbl, double bdp_dbl) { - int32_t bdp = GPR_CLAMP((int32_t)bdp_dbl, 128, INT32_MAX); - int32_t target = GPR_MAX((int32_t)bw_dbl / 1000, bdp); + int32_t bdp = (int32_t)GPR_CLAMP(bdp_dbl, 128.0, INT32_MAX); + int32_t target = (int32_t)GPR_MAX(bw_dbl / 1000, bdp); // frame size is bounded [2^14,2^24-1] int32_t frame_size = GPR_CLAMP(target, 16384, 16777215); int64_t delta = (int64_t)frame_size - diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 9fa72ddbdf..a5b67e5aba 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -67,6 +67,11 @@ typedef enum { } grpc_chttp2_ping_type; typedef enum { + GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY, + GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT, +} grpc_chttp2_optimization_target; + +typedef enum { GRPC_CHTTP2_PCL_INITIATE = 0, GRPC_CHTTP2_PCL_NEXT, GRPC_CHTTP2_PCL_INFLIGHT, @@ -229,6 +234,8 @@ struct grpc_chttp2_transport { /** should we probe bdp? */ bool enable_bdp_probe; + grpc_chttp2_optimization_target opt_target; + /** various lists of streams */ grpc_chttp2_stream_list lists[STREAM_LIST_COUNT]; diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c index 2c91ad357c..9f82c480bc 100644 --- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c +++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c @@ -57,9 +57,6 @@ #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) -/* Uncomment the following to enable extra checks on poll_object operations */ -/* #define PO_DEBUG */ - /* The maximum number of polling threads per polling island. By default no limit */ static int g_max_pollers_per_pi = INT_MAX; @@ -92,7 +89,7 @@ typedef enum { } poll_obj_type; typedef struct poll_obj { -#ifdef PO_DEBUG +#ifndef NDEBUG poll_obj_type obj_type; #endif gpr_mu mu; @@ -893,7 +890,7 @@ static grpc_fd *fd_create(int fd, const char *name) { * would be holding a lock to it anyway. */ gpr_mu_lock(&new_fd->po.mu); new_fd->po.pi = NULL; -#ifdef PO_DEBUG +#ifndef NDEBUG new_fd->po.obj_type = POLL_OBJ_FD; #endif @@ -1171,7 +1168,7 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { gpr_mu_init(&pollset->po.mu); *mu = &pollset->po.mu; pollset->po.pi = NULL; -#ifdef PO_DEBUG +#ifndef NDEBUG pollset->po.obj_type = POLL_OBJ_POLLSET; #endif @@ -1625,7 +1622,7 @@ static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag, poll_obj_type item_type) { GPR_TIMER_BEGIN("add_poll_object", 0); -#ifdef PO_DEBUG +#ifndef NDEBUG GPR_ASSERT(item->obj_type == item_type); GPR_ASSERT(bag->obj_type == bag_type); #endif @@ -1784,7 +1781,7 @@ static grpc_pollset_set *pollset_set_create(void) { grpc_pollset_set *pss = gpr_malloc(sizeof(*pss)); gpr_mu_init(&pss->po.mu); pss->po.pi = NULL; -#ifdef PO_DEBUG +#ifndef NDEBUG pss->po.obj_type = POLL_OBJ_POLLSET_SET; #endif return pss; diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index 5574838187..5690431759 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -103,6 +103,32 @@ typedef struct pollable { grpc_pollset_worker *root_worker; } pollable; +static const char *polling_obj_type_string(polling_obj_type t) { + switch (t) { + case PO_POLLING_GROUP: + return "polling_group"; + case PO_POLLSET_SET: + return "pollset_set"; + case PO_POLLSET: + return "pollset"; + case PO_FD: + return "fd"; + case PO_EMPTY_POLLABLE: + return "empty_pollable"; + case PO_COUNT: + return "<invalid:count>"; + } + return "<invalid>"; +} + +static char *pollable_desc(pollable *p) { + char *out; + gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d", + polling_obj_type_string(p->po.type), p->po.group, p->epfd, + p->wakeup.read_fd); + return out; +} + static pollable g_empty_pollable; static void pollable_init(pollable *p, polling_obj_type type); @@ -472,7 +498,7 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { GPR_ASSERT(epfd != -1); if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "add fd %p to pollable %p", fd, p); + gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p); } gpr_mu_lock(&fd->orphaned_mu); @@ -537,10 +563,18 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, if (worker->pollable != &pollset->pollable) { gpr_mu_lock(&worker->pollable->po.mu); } - if (worker->initialized_cv) { + if (worker->initialized_cv && worker != pollset->root_worker) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)", + pollset, worker, &pollset->pollable, worker->pollable); + } worker->kicked = true; gpr_cv_signal(&worker->cv); } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)", + pollset, worker, &pollset->pollable, worker->pollable); + } append_error(&error, grpc_wakeup_fd_wakeup(&worker->pollable->wakeup), "pollset_shutdown"); } @@ -770,7 +804,9 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int timeout = poll_deadline_to_millis_timeout(deadline, now); if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p poll %p for %dms", pollset, p, timeout); + char *desc = pollable_desc(p); + gpr_log(GPR_DEBUG, "PS:%p poll %p[%s] for %dms", pollset, p, desc, timeout); + gpr_free(desc); } if (timeout != 0) { @@ -985,10 +1021,11 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, static const char *err_desc = "pollset_add_fd"; grpc_error *error = GRPC_ERROR_NONE; if (pollset->current_pollable == &g_empty_pollable) { - if (GRPC_TRACER_ON(grpc_polling_trace)) + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from empty to fd", pollset, fd); + } /* empty pollable --> single fd pollable */ pollset_kick_all(exec_ctx, pollset); pollset->current_pollable = &fd->pollable; @@ -997,16 +1034,23 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, if (!fd_locked) gpr_mu_unlock(&fd->pollable.po.mu); REF_BY(fd, 2, "pollset_pollable"); } else if (pollset->current_pollable == &pollset->pollable) { - if (GRPC_TRACER_ON(grpc_polling_trace)) + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd); + } append_error(&error, pollable_add_fd(pollset->current_pollable, fd), err_desc); } else if (pollset->current_pollable != &fd->pollable) { grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable; - if (GRPC_TRACER_ON(grpc_polling_trace)) + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from fd %p to multipoller", pollset, fd, had_fd); + } + /* Introduce a spurious completion. + If we do not, then it may be that the fd-specific epoll set consumed + a completion without being polled, leading to a missed edge going up. */ + grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure); + grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure); pollset_kick_all(exec_ctx, pollset); pollset->current_pollable = &pollset->pollable; if (append_error(&error, pollable_materialize(&pollset->pollable), diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c index 255e07010b..3c4ca9c7c5 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.c +++ b/src/core/lib/iomgr/ev_epollsig_linux.c @@ -54,9 +54,6 @@ gpr_log(GPR_INFO, __VA_ARGS__); \ } -/* Uncomment the following to enable extra checks on poll_object operations */ -/* #define PO_DEBUG */ - static int grpc_wakeup_signal = -1; static bool is_grpc_wakeup_signal_initialized = false; @@ -85,7 +82,7 @@ typedef enum { } poll_obj_type; typedef struct poll_obj { -#ifdef PO_DEBUG +#ifndef NDEBUG poll_obj_type obj_type; #endif gpr_mu mu; @@ -821,7 +818,7 @@ static grpc_fd *fd_create(int fd, const char *name) { * would be holding a lock to it anyway. */ gpr_mu_lock(&new_fd->po.mu); new_fd->po.pi = NULL; -#ifdef PO_DEBUG +#ifndef NDEBUG new_fd->po.obj_type = POLL_OBJ_FD; #endif @@ -1079,7 +1076,7 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { gpr_mu_init(&pollset->po.mu); *mu = &pollset->po.mu; pollset->po.pi = NULL; -#ifdef PO_DEBUG +#ifndef NDEBUG pollset->po.obj_type = POLL_OBJ_POLLSET; #endif @@ -1416,7 +1413,7 @@ static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag, poll_obj_type item_type) { GPR_TIMER_BEGIN("add_poll_object", 0); -#ifdef PO_DEBUG +#ifndef NDEBUG GPR_ASSERT(item->obj_type == item_type); GPR_ASSERT(bag->obj_type == bag_type); #endif @@ -1575,7 +1572,7 @@ static grpc_pollset_set *pollset_set_create(void) { grpc_pollset_set *pss = gpr_malloc(sizeof(*pss)); gpr_mu_init(&pss->po.mu); pss->po.pi = NULL; -#ifdef PO_DEBUG +#ifndef NDEBUG pss->po.obj_type = POLL_OBJ_POLLSET_SET; #endif return pss; diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index 45de289e45..a98b8e62db 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -54,7 +54,7 @@ static int retry_named_port_failure(int status, request *r, int retry_status; uv_getaddrinfo_t *req = gpr_malloc(sizeof(uv_getaddrinfo_t)); req->data = r; - r->port = svc[i][1]; + r->port = gpr_strdup(svc[i][1]); retry_status = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_cb, r->host, r->port, r->hints); if (retry_status < 0 || getaddrinfo_cb == NULL) { @@ -127,6 +127,8 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status, GRPC_CLOSURE_SCHED(&exec_ctx, r->on_done, error); grpc_exec_ctx_finish(&exec_ctx); gpr_free(r->hints); + gpr_free(r->host); + gpr_free(r->port); gpr_free(r); uv_freeaddrinfo(res); } diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index ab6832932f..2f1d237c07 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -48,6 +48,7 @@ typedef struct grpc_uv_tcp_connect { static void uv_tcp_connect_cleanup(grpc_exec_ctx *exec_ctx, grpc_uv_tcp_connect *connect) { grpc_resource_quota_unref_internal(exec_ctx, connect->resource_quota); + gpr_free(connect->addr_name); gpr_free(connect); } @@ -105,6 +106,7 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) { } done = (--connect->refs == 0); if (done) { + grpc_exec_ctx_flush(&exec_ctx); uv_tcp_connect_cleanup(&exec_ctx, connect); } GRPC_CLOSURE_SCHED(&exec_ctx, closure, error); @@ -140,6 +142,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, connect->resource_quota = resource_quota; uv_tcp_init(uv_default_loop(), connect->tcp_handle); connect->connect_req.data = connect; + connect->refs = 1; if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index 2de0ea90e7..2ab836cc34 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -234,6 +234,7 @@ static void on_connect(uv_stream_t *server, int status) { sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, acceptor); grpc_exec_ctx_finish(&exec_ctx); + gpr_free(peer_name_string); } } diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 7c21b44e76..ff5fd3edc8 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -65,7 +65,10 @@ typedef struct { } grpc_tcp; static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + grpc_slice_unref_internal(exec_ctx, tcp->read_slice); grpc_resource_user_unref(exec_ctx, tcp->resource_user); + gpr_free(tcp->handle); + gpr_free(tcp->peer_string); gpr_free(tcp); } @@ -115,13 +118,17 @@ static void uv_close_callback(uv_handle_t *handle) { grpc_exec_ctx_finish(&exec_ctx); } +static grpc_slice alloc_read_slice(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user) { + return grpc_resource_user_slice_malloc(exec_ctx, resource_user, + GRPC_TCP_DEFAULT_READ_SLICE_SIZE); +} + static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp *tcp = handle->data; (void)suggested_size; - tcp->read_slice = grpc_resource_user_slice_malloc( - &exec_ctx, tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); buf->base = (char *)GRPC_SLICE_START_PTR(tcp->read_slice); buf->len = GRPC_SLICE_LENGTH(tcp->read_slice); grpc_exec_ctx_finish(&exec_ctx); @@ -148,6 +155,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, // Successful read sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread); grpc_slice_buffer_add(tcp->read_slices, sub); + tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user); error = GRPC_ERROR_NONE; if (GRPC_TRACER_ON(grpc_tcp_trace)) { size_t i; @@ -334,6 +342,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, grpc_resource_quota *resource_quota, char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp); @@ -350,6 +359,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, tcp->peer_string = gpr_strdup(peer_string); tcp->shutting_down = false; tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); + tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); @@ -357,6 +367,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, uv_unref((uv_handle_t *)handle); #endif + grpc_exec_ctx_finish(&exec_ctx); return &tcp->base; } diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index bf73d2c685..e6a9eb0e86 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -44,41 +44,63 @@ grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false); grpc_tracer_flag grpc_timer_check_trace = GRPC_TRACER_INITIALIZER(false); +/* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with + * deadlines earlier than 'queue_deadline" cap are maintained in the heap and + * others are maintained in the list (unordered). This helps to keep the number + * of elements in the heap low. + * + * The 'queue_deadline_cap' gets recomputed periodically based on the timer + * stats maintained in 'stats' and the relevant timers are then moved from the + * 'list' to 'heap' + */ typedef struct { gpr_mu mu; grpc_time_averaged_stats stats; /* All and only timers with deadlines <= this will be in the heap. */ gpr_atm queue_deadline_cap; + /* The deadline of the next timer due in this shard */ gpr_atm min_deadline; - /* Index in the g_shard_queue */ + /* Index of this timer_shard in the g_shard_queue */ uint32_t shard_queue_index; /* This holds all timers with deadlines < queue_deadline_cap. Timers in this list have the top bit of their deadline set to 0. */ grpc_timer_heap heap; /* This holds timers whose deadline is >= queue_deadline_cap. */ grpc_timer list; -} shard_type; +} timer_shard; + +/* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address + * is hashed to select the timer shard to add the timer to */ +static timer_shard g_shards[NUM_SHARDS]; + +/* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e + * the deadline of the next timer in each shard). + * Access to this is protected by g_shared_mutables.mu */ +static timer_shard *g_shard_queue[NUM_SHARDS]; + +/* Thread local variable that stores the deadline of the next timer the thread + * has last-seen. This is an optimization to prevent the thread from checking + * shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock, + * an expensive operation) */ +GPR_TLS_DECL(g_last_seen_min_timer); struct shared_mutables { + /* The deadline of the next timer due across all timer shards */ gpr_atm min_timer; /* Allow only one run_some_expired_timers at once */ gpr_spinlock checker_mu; bool initialized; - /* Protects g_shard_queue */ + /* Protects g_shard_queue (and the shared_mutables struct itself) */ gpr_mu mu; } GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE); static struct shared_mutables g_shared_mutables = { .checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false, }; + static gpr_clock_type g_clock_type; -static shard_type g_shards[NUM_SHARDS]; -/* Protected by g_shared_mutables.mu */ -static shard_type *g_shard_queue[NUM_SHARDS]; static gpr_timespec g_start_time; -GPR_TLS_DECL(g_last_seen_min_timer); - static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { if (a > GPR_ATM_MAX - b) { return GPR_ATM_MAX; @@ -122,7 +144,7 @@ static gpr_timespec atm_to_timespec(gpr_atm x) { return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0)); } -static gpr_atm compute_min_deadline(shard_type *shard) { +static gpr_atm compute_min_deadline(timer_shard *shard) { return grpc_timer_heap_is_empty(&shard->heap) ? saturating_add(shard->queue_deadline_cap, 1) : grpc_timer_heap_top(&shard->heap)->deadline; @@ -142,7 +164,7 @@ void grpc_timer_list_init(gpr_timespec now) { grpc_register_tracer("timer_check", &grpc_timer_check_trace); for (i = 0; i < NUM_SHARDS; i++) { - shard_type *shard = &g_shards[i]; + timer_shard *shard = &g_shards[i]; gpr_mu_init(&shard->mu); grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1, 0.5); @@ -161,7 +183,7 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { exec_ctx, GPR_ATM_MAX, NULL, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown")); for (i = 0; i < NUM_SHARDS; i++) { - shard_type *shard = &g_shards[i]; + timer_shard *shard = &g_shards[i]; gpr_mu_destroy(&shard->mu); grpc_timer_heap_destroy(&shard->heap); } @@ -187,7 +209,7 @@ static void list_remove(grpc_timer *timer) { } static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) { - shard_type *temp; + timer_shard *temp; temp = g_shard_queue[first_shard_queue_index]; g_shard_queue[first_shard_queue_index] = g_shard_queue[first_shard_queue_index + 1]; @@ -198,7 +220,7 @@ static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) { first_shard_queue_index + 1; } -static void note_deadline_change(shard_type *shard) { +static void note_deadline_change(timer_shard *shard) { while (shard->shard_queue_index > 0 && shard->min_deadline < g_shard_queue[shard->shard_queue_index - 1]->min_deadline) { @@ -215,7 +237,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_timespec deadline, grpc_closure *closure, gpr_timespec now) { int is_first_timer = 0; - shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; + timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; GPR_ASSERT(deadline.clock_type == g_clock_type); GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; @@ -303,7 +325,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { return; } - shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; + timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; gpr_mu_lock(&shard->mu); if (GRPC_TRACER_ON(grpc_timer_trace)) { gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer, @@ -321,12 +343,12 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { gpr_mu_unlock(&shard->mu); } -/* This is called when the queue is empty and "now" has reached the - queue_deadline_cap. We compute a new queue deadline and then scan the map - for timers that fall at or under it. Returns true if the queue is no - longer empty. +/* Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving + all relevant timers in shard->list (i.e timers with deadlines earlier than + 'queue_deadline_cap') into into shard->heap. + Returns 'true' if shard->heap has atleast ONE element REQUIRES: shard->mu locked */ -static int refill_queue(shard_type *shard, gpr_atm now) { +static int refill_heap(timer_shard *shard, gpr_atm now) { /* Compute the new queue window width and bound by the limits: */ double computed_deadline_delta = grpc_time_averaged_stats_update_average(&shard->stats) * @@ -363,7 +385,7 @@ static int refill_queue(shard_type *shard, gpr_atm now) { /* This pops the next non-cancelled timer with deadline <= now from the queue, or returns NULL if there isn't one. REQUIRES: shard->mu locked */ -static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { +static grpc_timer *pop_one(timer_shard *shard, gpr_atm now) { grpc_timer *timer; for (;;) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { @@ -373,7 +395,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { } if (grpc_timer_heap_is_empty(&shard->heap)) { if (now < shard->queue_deadline_cap) return NULL; - if (!refill_queue(shard, now)) return NULL; + if (!refill_heap(shard, now)) return NULL; } timer = grpc_timer_heap_top(&shard->heap); if (GRPC_TRACER_ON(grpc_timer_check_trace)) { @@ -393,7 +415,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { } /* REQUIRES: shard->mu unlocked */ -static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, +static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard, gpr_atm now, gpr_atm *new_min_deadline, grpc_error *error) { size_t n = 0; diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c index 520d4a3252..cb7998db97 100644 --- a/src/core/lib/iomgr/timer_manager.c +++ b/src/core/lib/iomgr/timer_manager.c @@ -50,6 +50,9 @@ static completed_thread *g_completed_threads; static bool g_kicked; // is there a thread waiting until the next timer should fire? static bool g_has_timed_waiter; +// the deadline of the current timed waiter thread (only relevant if +// g_has_timed_waiter is true) +static gpr_timespec g_timed_waiter_deadline; // generation counter to track which thread is waiting for the next timer static uint64_t g_timed_waiter_generation; @@ -101,8 +104,7 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) { start_timer_thread_and_unlock(); } else { // if there's no thread waiting with a timeout, kick an existing - // waiter - // so that the next deadline is not missed + // waiter so that the next deadline is not missed if (!g_has_timed_waiter) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "kick untimed waiter"); @@ -132,44 +134,79 @@ static bool wait_until(gpr_timespec next) { gpr_mu_unlock(&g_mu); return false; } - // if there's no timed waiter, we should become one: that waiter waits - // only until the next timer should expire - // all other timers wait forever - uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1; - if (!g_has_timed_waiter && gpr_time_cmp(next, inf_future) != 0) { - g_has_timed_waiter = true; - // we use a generation counter to track the timed waiter so we can - // cancel an existing one quickly (and when it actually times out it'll - // figure stuff out instead of incurring a wakeup) - my_timed_waiter_generation = ++g_timed_waiter_generation; - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_timespec wait_time = gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC)); - gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds", - wait_time.tv_sec, wait_time.tv_nsec); + + // If g_kicked is true at this point, it means there was a kick from the timer + // system that the timer-manager threads here missed. We cannot trust 'next' + // here any longer (since there might be an earlier deadline). So if g_kicked + // is true at this point, we should quickly exit this and get the next + // deadline from the timer system + + if (!g_kicked) { + // if there's no timed waiter, we should become one: that waiter waits + // only until the next timer should expire. All other timers wait forever + // + // 'g_timed_waiter_generation' is a global generation counter. The idea here + // is that the thread becoming a timed-waiter increments and stores this + // global counter locally in 'my_timed_waiter_generation' before going to + // sleep. After waking up, if my_timed_waiter_generation == + // g_timed_waiter_generation, it can be sure that it was the timed_waiter + // thread (and that no other thread took over while this was asleep) + // + // Initialize my_timed_waiter_generation to some value that is NOT equal to + // g_timed_waiter_generation + uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1; + + /* If there's no timed waiter, we should become one: that waiter waits only + until the next timer should expire. All other timer threads wait forever + unless their 'next' is earlier than the current timed-waiter's deadline + (in which case the thread with earlier 'next' takes over as the new timed + waiter) */ + if (gpr_time_cmp(next, inf_future) != 0) { + if (!g_has_timed_waiter || + (gpr_time_cmp(next, g_timed_waiter_deadline) < 0)) { + my_timed_waiter_generation = ++g_timed_waiter_generation; + g_has_timed_waiter = true; + g_timed_waiter_deadline = next; + + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_timespec wait_time = + gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC)); + gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds", + wait_time.tv_sec, wait_time.tv_nsec); + } + } else { // g_timed_waiter == true && next >= g_timed_waiter_deadline + next = inf_future; + } } - } else { - next = inf_future; - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + + if (GRPC_TRACER_ON(grpc_timer_check_trace) && + gpr_time_cmp(next, inf_future) == 0) { gpr_log(GPR_DEBUG, "sleep until kicked"); } + + gpr_cv_wait(&g_cv_wait, &g_mu, next); + + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", + my_timed_waiter_generation == g_timed_waiter_generation, + g_kicked); + } + // if this was the timed waiter, then we need to check timers, and flag + // that there's now no timed waiter... we'll look for a replacement if + // there's work to do after checking timers (code above) + if (my_timed_waiter_generation == g_timed_waiter_generation) { + g_has_timed_waiter = false; + g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + } } - gpr_cv_wait(&g_cv_wait, &g_mu, next); - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", - my_timed_waiter_generation == g_timed_waiter_generation, g_kicked); - } - // if this was the timed waiter, then we need to check timers, and flag - // that there's now no timed waiter... we'll look for a replacement if - // there's work to do after checking timers (code above) - if (my_timed_waiter_generation == g_timed_waiter_generation) { - g_has_timed_waiter = false; - } + // if this was a kick from the timer system, consume it (and don't stop // this thread yet) if (g_kicked) { grpc_timer_consume_kick(); g_kicked = false; } + gpr_mu_unlock(&g_mu); return true; } @@ -257,6 +294,9 @@ void grpc_timer_manager_init(void) { g_waiter_count = 0; g_completed_threads = NULL; + g_has_timed_waiter = false; + g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + start_threads(); } @@ -302,6 +342,7 @@ void grpc_kick_poller(void) { gpr_mu_lock(&g_mu); g_kicked = true; g_has_timed_waiter = false; + g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); ++g_timed_waiter_generation; gpr_cv_signal(&g_cv_wait); gpr_mu_unlock(&g_mu); diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c index 8c747085bb..6cd558d123 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.c +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c @@ -462,6 +462,35 @@ static BIGNUM *bignum_from_base64(grpc_exec_ctx *exec_ctx, const char *b64) { return result; } +#if OPENSSL_VERSION_NUMBER < 0x10100000L + +// Provide compatibility across OpenSSL 1.02 and 1.1. +static int RSA_set0_key(RSA *r, BIGNUM *n, BIGNUM *e, BIGNUM *d) { + /* If the fields n and e in r are NULL, the corresponding input + * parameters MUST be non-NULL for n and e. d may be + * left NULL (in case only the public key is used). + */ + if ((r->n == NULL && n == NULL) || (r->e == NULL && e == NULL)) { + return 0; + } + + if (n != NULL) { + BN_free(r->n); + r->n = n; + } + if (e != NULL) { + BN_free(r->e); + r->e = e; + } + if (d != NULL) { + BN_free(r->d); + r->d = d; + } + + return 1; +} +#endif // OPENSSL_VERSION_NUMBER < 0x10100000L + static EVP_PKEY *pkey_from_jwk(grpc_exec_ctx *exec_ctx, const grpc_json *json, const char *kty) { const grpc_json *key_prop; @@ -478,21 +507,27 @@ static EVP_PKEY *pkey_from_jwk(grpc_exec_ctx *exec_ctx, const grpc_json *json, gpr_log(GPR_ERROR, "Could not create rsa key."); goto end; } + BIGNUM *tmp_n = NULL; + BIGNUM *tmp_e = NULL; for (key_prop = json->child; key_prop != NULL; key_prop = key_prop->next) { if (strcmp(key_prop->key, "n") == 0) { - rsa->n = + tmp_n = bignum_from_base64(exec_ctx, validate_string_field(key_prop, "n")); - if (rsa->n == NULL) goto end; + if (tmp_n == NULL) goto end; } else if (strcmp(key_prop->key, "e") == 0) { - rsa->e = + tmp_e = bignum_from_base64(exec_ctx, validate_string_field(key_prop, "e")); - if (rsa->e == NULL) goto end; + if (tmp_e == NULL) goto end; } } - if (rsa->e == NULL || rsa->n == NULL) { + if (tmp_e == NULL || tmp_n == NULL) { gpr_log(GPR_ERROR, "Missing RSA public key field."); goto end; } + if (!RSA_set0_key(rsa, tmp_n, tmp_e, NULL)) { + gpr_log(GPR_ERROR, "Cannot set RSA key from inputs."); + goto end; + } result = EVP_PKEY_new(); EVP_PKEY_set1_RSA(result, rsa); /* uprefs rsa. */ diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index 58112b04b4..50a51b31cd 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -49,7 +49,6 @@ typedef struct { pollset_set so that work can progress when this call wants work to progress */ grpc_polling_entity *pollent; - grpc_transport_stream_op_batch op; gpr_atm security_context_set; gpr_mu security_context_mu; grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT]; @@ -92,11 +91,10 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data, size_t num_md, grpc_credentials_status status, const char *error_details) { - grpc_call_element *elem = (grpc_call_element *)user_data; + grpc_transport_stream_op_batch *batch = + (grpc_transport_stream_op_batch *)user_data; + grpc_call_element *elem = batch->handler_private.extra_arg; call_data *calld = elem->call_data; - grpc_transport_stream_op_batch *op = &calld->op; - grpc_metadata_batch *mdb; - size_t i; reset_auth_metadata_context(&calld->auth_md_context); grpc_error *error = GRPC_ERROR_NONE; if (status != GRPC_CREDENTIALS_OK) { @@ -108,9 +106,10 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED); } else { GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT); - GPR_ASSERT(op->send_initial_metadata); - mdb = op->payload->send_initial_metadata.send_initial_metadata; - for (i = 0; i < num_md; i++) { + GPR_ASSERT(batch->send_initial_metadata); + grpc_metadata_batch *mdb = + batch->payload->send_initial_metadata.send_initial_metadata; + for (size_t i = 0; i < num_md; i++) { add_error(&error, grpc_metadata_batch_add_tail( exec_ctx, mdb, &calld->md_links[i], @@ -120,9 +119,9 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data, } } if (error == GRPC_ERROR_NONE) { - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, batch); } else { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error); } } @@ -158,11 +157,11 @@ void build_auth_metadata_context(grpc_security_connector *sc, static void send_security_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { + grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_client_security_context *ctx = - (grpc_client_security_context *)op->payload + (grpc_client_security_context *)batch->payload ->context[GRPC_CONTEXT_SECURITY] .value; grpc_call_credentials *channel_call_creds = @@ -171,7 +170,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, if (channel_call_creds == NULL && !call_creds_has_md) { /* Skip sending metadata altogether. */ - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, batch); return; } @@ -180,7 +179,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, ctx->creds, NULL); if (calld->creds == NULL) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, + exec_ctx, batch, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Incompatible credentials set on channel and call."), @@ -194,28 +193,29 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, build_auth_metadata_context(&chand->security_connector->base, chand->auth_context, calld); - calld->op = *op; /* Copy op (originates from the caller's stack). */ GPR_ASSERT(calld->pollent != NULL); grpc_call_credentials_get_request_metadata( exec_ctx, calld->creds, calld->pollent, calld->auth_md_context, - on_credentials_metadata, elem); + on_credentials_metadata, batch); } static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data, grpc_security_status status) { - grpc_call_element *elem = (grpc_call_element *)user_data; + grpc_transport_stream_op_batch *batch = + (grpc_transport_stream_op_batch *)user_data; + grpc_call_element *elem = batch->handler_private.extra_arg; call_data *calld = elem->call_data; if (status == GRPC_SECURITY_OK) { - send_security_metadata(exec_ctx, elem, &calld->op); + send_security_metadata(exec_ctx, elem, batch); } else { char *error_msg; char *host = grpc_slice_to_c_string(calld->host); gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.", host); gpr_free(host); - grpc_call_element_signal_error( - exec_ctx, elem, + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, batch, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED)); @@ -223,35 +223,29 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data, } } -/* Called either: - - in response to an API call (or similar) from above, to send something - - a network event (or similar) from below, to receive something - op contains type and call direction information, in addition to the data - that is being sent or received. */ -static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - GPR_TIMER_BEGIN("auth_start_transport_op", 0); +static void auth_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *batch) { + GPR_TIMER_BEGIN("auth_start_transport_stream_op_batch", 0); /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - grpc_linked_mdelem *l; - grpc_client_security_context *sec_ctx = NULL; - if (!op->cancel_stream) { + if (!batch->cancel_stream) { /* double checked lock over security context to ensure it's set once */ if (gpr_atm_acq_load(&calld->security_context_set) == 0) { gpr_mu_lock(&calld->security_context_mu); if (gpr_atm_acq_load(&calld->security_context_set) == 0) { - GPR_ASSERT(op->payload->context != NULL); - if (op->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) { - op->payload->context[GRPC_CONTEXT_SECURITY].value = + GPR_ASSERT(batch->payload->context != NULL); + if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) { + batch->payload->context[GRPC_CONTEXT_SECURITY].value = grpc_client_security_context_create(); - op->payload->context[GRPC_CONTEXT_SECURITY].destroy = + batch->payload->context[GRPC_CONTEXT_SECURITY].destroy = grpc_client_security_context_destroy; } - sec_ctx = op->payload->context[GRPC_CONTEXT_SECURITY].value; + grpc_client_security_context *sec_ctx = + batch->payload->context[GRPC_CONTEXT_SECURITY].value; GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter"); sec_ctx->auth_context = GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter"); @@ -261,9 +255,9 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, } } - if (op->send_initial_metadata) { - for (l = op->payload->send_initial_metadata.send_initial_metadata->list - .head; + if (batch->send_initial_metadata) { + for (grpc_linked_mdelem *l = batch->payload->send_initial_metadata + .send_initial_metadata->list.head; l != NULL; l = l->next) { grpc_mdelem md = l->md; /* Pointer comparison is OK for md_elems created from the same context. @@ -284,19 +278,19 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, } if (calld->have_host) { char *call_host = grpc_slice_to_c_string(calld->host); - calld->op = *op; /* Copy op (originates from the caller's stack). */ + batch->handler_private.extra_arg = elem; grpc_channel_security_connector_check_call_host( exec_ctx, chand->security_connector, call_host, chand->auth_context, - on_host_checked, elem); + on_host_checked, batch); gpr_free(call_host); - GPR_TIMER_END("auth_start_transport_op", 0); + GPR_TIMER_END("auth_start_transport_stream_op_batch", 0); return; /* early exit */ } } /* pass control down the stack */ - grpc_call_next_op(exec_ctx, elem, op); - GPR_TIMER_END("auth_start_transport_op", 0); + grpc_call_next_op(exec_ctx, elem, batch); + GPR_TIMER_END("auth_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ @@ -379,7 +373,15 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } const grpc_channel_filter grpc_client_auth_filter = { - auth_start_transport_op, grpc_channel_next_op, sizeof(call_data), - init_call_elem, set_pollset_or_pollset_set, destroy_call_elem, - sizeof(channel_data), init_channel_elem, destroy_channel_elem, - grpc_call_next_get_peer, grpc_channel_next_get_info, "client-auth"}; + auth_start_transport_stream_op_batch, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + set_pollset_or_pollset_set, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + grpc_channel_next_get_info, + "client-auth"}; diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c index 3c0c24254b..29327107e5 100644 --- a/src/core/lib/security/transport/security_connector.c +++ b/src/core/lib/security/transport/security_connector.c @@ -383,8 +383,7 @@ static void fake_channel_add_handshakers( grpc_handshake_manager_add( handshake_mgr, grpc_security_handshaker_create( - exec_ctx, tsi_create_adapter_handshaker( - tsi_create_fake_handshaker(true /* is_client */)), + exec_ctx, tsi_create_fake_handshaker(true /* is_client */), &sc->base)); } @@ -394,8 +393,7 @@ static void fake_server_add_handshakers(grpc_exec_ctx *exec_ctx, grpc_handshake_manager_add( handshake_mgr, grpc_security_handshaker_create( - exec_ctx, tsi_create_adapter_handshaker( - tsi_create_fake_handshaker(false /* is_client */)), + exec_ctx, tsi_create_fake_handshaker(false /* is_client */), &sc->base)); } diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index 4e6914be7b..9bf3f0ca0f 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -27,14 +27,9 @@ #include "src/core/lib/slice/slice_internal.h" typedef struct call_data { - grpc_metadata_batch *recv_initial_metadata; - /* Closure to call when finished with the auth_on_recv hook. */ - grpc_closure *on_done_recv; - /* Receive closures are chained: we inject this closure as the on_done_recv - up-call on transport_op, and remember to call our on_done_recv member after - handling it. */ - grpc_closure auth_on_recv; - grpc_transport_stream_op_batch *transport_op; + grpc_transport_stream_op_batch *recv_initial_metadata_batch; + grpc_closure *original_recv_initial_metadata_ready; + grpc_closure recv_initial_metadata_ready; grpc_metadata_array md; const grpc_metadata *consumed_md; size_t num_consumed_md; @@ -90,125 +85,96 @@ static void on_md_processing_done( grpc_status_code status, const char *error_details) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; + grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - /* TODO(jboeuf): Implement support for response_md. */ if (response_md != NULL && num_response_md > 0) { gpr_log(GPR_INFO, "response_md in auth metadata processing not supported for now. " "Ignoring..."); } - + grpc_error *error = GRPC_ERROR_NONE; if (status == GRPC_STATUS_OK) { calld->consumed_md = consumed_md; calld->num_consumed_md = num_consumed_md; - /* TODO(ctiller): propagate error */ - GRPC_LOG_IF_ERROR( - "grpc_metadata_batch_filter", - grpc_metadata_batch_filter(&exec_ctx, calld->recv_initial_metadata, - remove_consumed_md, elem, - "Response metadata filtering error")); - for (size_t i = 0; i < calld->md.count; i++) { - grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key); - grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value); - } - grpc_metadata_array_destroy(&calld->md); - GRPC_CLOSURE_SCHED(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE); + error = grpc_metadata_batch_filter( + &exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata, + remove_consumed_md, elem, "Response metadata filtering error"); } else { - for (size_t i = 0; i < calld->md.count; i++) { - grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key); - grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value); - } - grpc_metadata_array_destroy(&calld->md); - error_details = error_details != NULL - ? error_details - : "Authentication metadata processing failed."; - if (calld->transport_op->send_message) { - grpc_byte_stream_destroy( - &exec_ctx, calld->transport_op->payload->send_message.send_message); - calld->transport_op->payload->send_message.send_message = NULL; + if (error_details == NULL) { + error_details = "Authentication metadata processing failed."; } - GRPC_CLOSURE_SCHED( - &exec_ctx, calld->on_done_recv, + error = grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details), - GRPC_ERROR_INT_GRPC_STATUS, status)); + GRPC_ERROR_INT_GRPC_STATUS, status); } - + for (size_t i = 0; i < calld->md.count; i++) { + grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key); + grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value); + } + grpc_metadata_array_destroy(&calld->md); + GRPC_CLOSURE_SCHED(&exec_ctx, calld->original_recv_initial_metadata_ready, + error); grpc_exec_ctx_finish(&exec_ctx); } -static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_error *error) { - grpc_call_element *elem = user_data; - call_data *calld = elem->call_data; +static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = arg; channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch; if (error == GRPC_ERROR_NONE) { if (chand->creds != NULL && chand->creds->processor.process != NULL) { - calld->md = metadata_batch_to_md_array(calld->recv_initial_metadata); + calld->md = metadata_batch_to_md_array( + batch->payload->recv_initial_metadata.recv_initial_metadata); chand->creds->processor.process( chand->creds->processor.state, calld->auth_context, calld->md.metadata, calld->md.count, on_md_processing_done, elem); return; } } - GRPC_CLOSURE_SCHED(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); } -static void set_recv_ops_md_callbacks(grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { +static void auth_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; - - if (op->recv_initial_metadata) { - /* substitute our callback for the higher callback */ - calld->recv_initial_metadata = - op->payload->recv_initial_metadata.recv_initial_metadata; - calld->on_done_recv = - op->payload->recv_initial_metadata.recv_initial_metadata_ready; - op->payload->recv_initial_metadata.recv_initial_metadata_ready = - &calld->auth_on_recv; - calld->transport_op = op; + if (batch->recv_initial_metadata) { + // Inject our callback. + calld->recv_initial_metadata_batch = batch; + calld->original_recv_initial_metadata_ready = + batch->payload->recv_initial_metadata.recv_initial_metadata_ready; + batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->recv_initial_metadata_ready; } -} - -/* Called either: - - in response to an API call (or similar) from above, to send something - - a network event (or similar) from below, to receive something - op contains type and call direction information, in addition to the data - that is being sent or received. */ -static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - set_recv_ops_md_callbacks(elem, op); - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, batch); } /* Constructor for call_data */ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { - /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - grpc_server_security_context *server_ctx = NULL; - - /* initialize members */ - memset(calld, 0, sizeof(*calld)); - GRPC_CLOSURE_INIT(&calld->auth_on_recv, auth_on_recv, elem, + GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); - + // Create server security context. Set its auth context from channel + // data and save it in the call context. + grpc_server_security_context *server_ctx = + grpc_server_security_context_create(); + server_ctx->auth_context = grpc_auth_context_create(chand->auth_context); + calld->auth_context = server_ctx->auth_context; if (args->context[GRPC_CONTEXT_SECURITY].value != NULL) { args->context[GRPC_CONTEXT_SECURITY].destroy( args->context[GRPC_CONTEXT_SECURITY].value); } - - server_ctx = grpc_server_security_context_create(); - server_ctx->auth_context = grpc_auth_context_create(chand->auth_context); - calld->auth_context = server_ctx->auth_context; - args->context[GRPC_CONTEXT_SECURITY].value = server_ctx; args->context[GRPC_CONTEXT_SECURITY].destroy = grpc_server_security_context_destroy; - return GRPC_ERROR_NONE; } @@ -221,19 +187,15 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args) { + GPR_ASSERT(!args->is_last); + channel_data *chand = elem->channel_data; grpc_auth_context *auth_context = grpc_find_auth_context_in_args(args->channel_args); - grpc_server_credentials *creds = - grpc_find_server_credentials_in_args(args->channel_args); - /* grab pointers to our data from the channel element */ - channel_data *chand = elem->channel_data; - - GPR_ASSERT(!args->is_last); GPR_ASSERT(auth_context != NULL); - - /* initialize members */ chand->auth_context = GRPC_AUTH_CONTEXT_REF(auth_context, "server_auth_filter"); + grpc_server_credentials *creds = + grpc_find_server_credentials_in_args(args->channel_args); chand->creds = grpc_server_credentials_ref(creds); return GRPC_ERROR_NONE; } @@ -241,14 +203,13 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, /* Destructor for channel data */ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { - /* grab pointers to our data from the channel element */ channel_data *chand = elem->channel_data; GRPC_AUTH_CONTEXT_UNREF(chand->auth_context, "server_auth_filter"); grpc_server_credentials_unref(exec_ctx, chand->creds); } const grpc_channel_filter grpc_server_auth_filter = { - auth_start_transport_op, + auth_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), init_call_elem, diff --git a/src/core/lib/support/arena.c b/src/core/lib/support/arena.c index b433c61b4c..9e0f73ae3d 100644 --- a/src/core/lib/support/arena.c +++ b/src/core/lib/support/arena.c @@ -38,7 +38,7 @@ struct gpr_arena { gpr_arena *gpr_arena_create(size_t initial_size) { initial_size = ROUND_UP_TO_ALIGNMENT_SIZE(initial_size); - gpr_arena *a = gpr_zalloc(sizeof(gpr_arena) + initial_size); + gpr_arena *a = (gpr_arena *)gpr_zalloc(sizeof(gpr_arena) + initial_size); a->initial_zone.size_end = initial_size; return a; } @@ -64,7 +64,7 @@ void *gpr_arena_alloc(gpr_arena *arena, size_t size) { zone *next_z = (zone *)gpr_atm_acq_load(&z->next_atm); if (next_z == NULL) { size_t next_z_size = (size_t)gpr_atm_no_barrier_load(&arena->size_so_far); - next_z = gpr_zalloc(sizeof(zone) + next_z_size); + next_z = (zone *)gpr_zalloc(sizeof(zone) + next_z_size); next_z->size_begin = z->size_end; next_z->size_end = z->size_end + next_z_size; if (!gpr_atm_rel_cas(&z->next_atm, (gpr_atm)NULL, (gpr_atm)next_z)) { diff --git a/src/core/lib/support/atm.c b/src/core/lib/support/atm.c index caa0bafe33..2f37d62f76 100644 --- a/src/core/lib/support/atm.c +++ b/src/core/lib/support/atm.c @@ -21,12 +21,12 @@ gpr_atm gpr_atm_no_barrier_clamped_add(gpr_atm *value, gpr_atm delta, gpr_atm min, gpr_atm max) { - gpr_atm current; - gpr_atm new; + gpr_atm current_value; + gpr_atm new_value; do { - current = gpr_atm_no_barrier_load(value); - new = GPR_CLAMP(current + delta, min, max); - if (new == current) break; - } while (!gpr_atm_no_barrier_cas(value, current, new)); - return new; + current_value = gpr_atm_no_barrier_load(value); + new_value = GPR_CLAMP(current_value + delta, min, max); + if (new_value == current_value) break; + } while (!gpr_atm_no_barrier_cas(value, current_value, new_value)); + return new_value; } diff --git a/src/core/lib/support/avl.c b/src/core/lib/support/avl.c index aa0f665272..a6178fdbce 100644 --- a/src/core/lib/support/avl.c +++ b/src/core/lib/support/avl.c @@ -76,7 +76,7 @@ static gpr_avl_node *assert_invariants(gpr_avl_node *n) { return n; } gpr_avl_node *new_node(void *key, void *value, gpr_avl_node *left, gpr_avl_node *right) { - gpr_avl_node *node = gpr_malloc(sizeof(*node)); + gpr_avl_node *node = (gpr_avl_node *)gpr_malloc(sizeof(*node)); gpr_ref_init(&node->refs, 1); node->key = key; node->value = value; diff --git a/src/core/lib/support/log_linux.c b/src/core/lib/support/log_linux.c index 5c512661a3..61d2346427 100644 --- a/src/core/lib/support/log_linux.c +++ b/src/core/lib/support/log_linux.c @@ -64,6 +64,8 @@ void gpr_default_log(gpr_log_func_args *args) { time_t timer; gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); struct tm tm; + static __thread long tid = 0; + if (tid == 0) tid = gettid(); timer = (time_t)now.tv_sec; final_slash = strrchr(args->file, '/'); @@ -81,7 +83,7 @@ void gpr_default_log(gpr_log_func_args *args) { gpr_asprintf(&prefix, "%s%s.%09" PRId32 " %7ld %s:%d]", gpr_log_severity_string(args->severity), time_buffer, - now.tv_nsec, gettid(), display_file, args->line); + now.tv_nsec, tid, display_file, args->line); fprintf(stderr, "%-60s %s\n", prefix, args->message); gpr_free(prefix); diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c index 58c4c435d3..e9f893988d 100644 --- a/src/core/lib/support/mpscq.c +++ b/src/core/lib/support/mpscq.c @@ -31,12 +31,11 @@ void gpr_mpscq_destroy(gpr_mpscq *q) { GPR_ASSERT(q->tail == &q->stub); } -bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { +void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL); gpr_mpscq_node *prev = (gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n); gpr_atm_rel_store(&prev->next, (gpr_atm)n); - return prev == &q->stub; } gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) { @@ -78,25 +77,3 @@ gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty) { *empty = false; return NULL; } - -void gpr_locked_mpscq_init(gpr_locked_mpscq *q) { - gpr_mpscq_init(&q->queue); - q->read_lock = GPR_SPINLOCK_INITIALIZER; -} - -void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q) { - gpr_mpscq_destroy(&q->queue); -} - -bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n) { - return gpr_mpscq_push(&q->queue, n); -} - -gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q) { - if (gpr_spinlock_trylock(&q->read_lock)) { - gpr_mpscq_node *n = gpr_mpscq_pop(&q->queue); - gpr_spinlock_unlock(&q->read_lock); - return n; - } - return NULL; -} diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h index 2f4739d7f8..daa51768f7 100644 --- a/src/core/lib/support/mpscq.h +++ b/src/core/lib/support/mpscq.h @@ -22,7 +22,6 @@ #include <grpc/support/atm.h> #include <stdbool.h> #include <stddef.h> -#include "src/core/lib/support/spinlock.h" // Multiple-producer single-consumer lock free queue, based upon the // implementation from Dmitry Vyukov here: @@ -44,34 +43,11 @@ typedef struct gpr_mpscq { void gpr_mpscq_init(gpr_mpscq *q); void gpr_mpscq_destroy(gpr_mpscq *q); // Push a node -// Thread safe - can be called from multiple threads concurrently -// Returns true if this was possibly the first node (may return true -// sporadically, will not return false sporadically) -bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n); +void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n); // Pop a node (returns NULL if no node is ready - which doesn't indicate that // the queue is empty!!) -// Thread compatible - can only be called from one thread at a time gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q); // Pop a node; sets *empty to true if the queue is empty, or false if it is not gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty); -// An mpscq with a spinlock: it's safe to pop from multiple threads, but doing -// only one thread will succeed concurrently -typedef struct gpr_locked_mpscq { - gpr_mpscq queue; - gpr_spinlock read_lock; -} gpr_locked_mpscq; - -void gpr_locked_mpscq_init(gpr_locked_mpscq *q); -void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q); -// Push a node -// Thread safe - can be called from multiple threads concurrently -// Returns true if this was possibly the first node (may return true -// sporadically, will not return false sporadically) -bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n); -// Pop a node (returns NULL if no node is ready - which doesn't indicate that -// the queue is empty!!) -// Thread safe - can be called from multiple threads concurrently -gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q); - #endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */ diff --git a/src/core/lib/support/stack_lockfree.c b/src/core/lib/support/stack_lockfree.c new file mode 100644 index 0000000000..0fb64ed001 --- /dev/null +++ b/src/core/lib/support/stack_lockfree.c @@ -0,0 +1,137 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/support/stack_lockfree.h" + +#include <stdlib.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> + +/* The lockfree node structure is a single architecture-level + word that allows for an atomic CAS to set it up. */ +struct lockfree_node_contents { + /* next thing to look at. Actual index for head, next index otherwise */ + uint16_t index; +#ifdef GPR_ARCH_64 + uint16_t pad; + uint32_t aba_ctr; +#else +#ifdef GPR_ARCH_32 + uint16_t aba_ctr; +#else +#error Unsupported bit width architecture +#endif +#endif +}; + +/* Use a union to make sure that these are in the same bits as an atm word */ +typedef union lockfree_node { + gpr_atm atm; + struct lockfree_node_contents contents; +} lockfree_node; + +/* make sure that entries aligned to 8-bytes */ +#define ENTRY_ALIGNMENT_BITS 3 +/* reserve this entry as invalid */ +#define INVALID_ENTRY_INDEX ((1 << 16) - 1) + +struct gpr_stack_lockfree { + lockfree_node *entries; + lockfree_node head; /* An atomic entry describing curr head */ +}; + +gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries) { + gpr_stack_lockfree *stack; + stack = (gpr_stack_lockfree *)gpr_malloc(sizeof(*stack)); + /* Since we only allocate 16 bits to represent an entry number, + * make sure that we are within the desired range */ + /* Reserve the highest entry number as a dummy */ + GPR_ASSERT(entries < INVALID_ENTRY_INDEX); + stack->entries = (lockfree_node *)gpr_malloc_aligned( + entries * sizeof(stack->entries[0]), ENTRY_ALIGNMENT_BITS); + /* Clear out all entries */ + memset(stack->entries, 0, entries * sizeof(stack->entries[0])); + memset(&stack->head, 0, sizeof(stack->head)); + + GPR_ASSERT(sizeof(stack->entries->atm) == sizeof(stack->entries->contents)); + + /* Point the head at reserved dummy entry */ + stack->head.contents.index = INVALID_ENTRY_INDEX; +/* Fill in the pad and aba_ctr to avoid confusing memcheck tools */ +#ifdef GPR_ARCH_64 + stack->head.contents.pad = 0; +#endif + stack->head.contents.aba_ctr = 0; + return stack; +} + +void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack) { + gpr_free_aligned(stack->entries); + gpr_free(stack); +} + +int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { + lockfree_node head; + lockfree_node newhead; + lockfree_node curent; + lockfree_node newent; + + /* First fill in the entry's index and aba ctr for new head */ + newhead.contents.index = (uint16_t)entry; +#ifdef GPR_ARCH_64 + /* Fill in the pad to avoid confusing memcheck tools */ + newhead.contents.pad = 0; +#endif + + /* Also post-increment the aba_ctr */ + curent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm); + newhead.contents.aba_ctr = ++curent.contents.aba_ctr; + gpr_atm_no_barrier_store(&stack->entries[entry].atm, curent.atm); + + do { + /* Atomically get the existing head value for use */ + head.atm = gpr_atm_no_barrier_load(&(stack->head.atm)); + /* Point to it */ + newent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm); + newent.contents.index = head.contents.index; + gpr_atm_no_barrier_store(&stack->entries[entry].atm, newent.atm); + } while (!gpr_atm_rel_cas(&(stack->head.atm), head.atm, newhead.atm)); + /* Use rel_cas above to make sure that entry index is set properly */ + return head.contents.index == INVALID_ENTRY_INDEX; +} + +int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack) { + lockfree_node head; + lockfree_node newhead; + + do { + head.atm = gpr_atm_acq_load(&(stack->head.atm)); + if (head.contents.index == INVALID_ENTRY_INDEX) { + return -1; + } + newhead.atm = + gpr_atm_no_barrier_load(&(stack->entries[head.contents.index].atm)); + + } while (!gpr_atm_no_barrier_cas(&(stack->head.atm), head.atm, newhead.atm)); + + return head.contents.index; +} diff --git a/src/core/lib/support/stack_lockfree.h b/src/core/lib/support/stack_lockfree.h new file mode 100644 index 0000000000..6324211b72 --- /dev/null +++ b/src/core/lib/support/stack_lockfree.h @@ -0,0 +1,38 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H +#define GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H + +#include <stddef.h> + +typedef struct gpr_stack_lockfree gpr_stack_lockfree; + +/* This stack must specify the maximum number of entries to track. + The current implementation only allows up to 65534 entries */ +gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries); +void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack); + +/* Pass in a valid entry number for the next stack entry */ +/* Returns 1 if this is the first element on the stack, 0 otherwise */ +int gpr_stack_lockfree_push(gpr_stack_lockfree *, int entry); + +/* Returns -1 on empty or the actual entry number */ +int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack); + +#endif /* GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H */ diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index c769866ceb..aac443ed06 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1504,7 +1504,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, goto done_with_error; } /* TODO(ctiller): just make these the same variable? */ - call->metadata_batch[0][0].deadline = call->send_deadline; + if (call->is_client) { + call->metadata_batch[0][0].deadline = call->send_deadline; + } stream_op_payload->send_initial_metadata.send_initial_metadata = &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; stream_op_payload->send_initial_metadata.send_initial_metadata_flags = diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index b04aee6c73..14e55eda85 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -189,16 +189,19 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { typedef struct cq_vtable { grpc_cq_completion_type cq_completion_type; - size_t (*size)(); - void (*begin_op)(grpc_completion_queue *cc, void *tag); - void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag, + size_t data_size; + void (*init)(void *data); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); + void (*destroy)(void *data); + void (*begin_op)(grpc_completion_queue *cq, void *tag); + void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage); - grpc_event (*next)(grpc_completion_queue *cc, gpr_timespec deadline, + grpc_event (*next)(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved); - grpc_event (*pluck)(grpc_completion_queue *cc, void *tag, + grpc_event (*pluck)(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved); } cq_vtable; @@ -218,25 +221,28 @@ typedef struct grpc_cq_event_queue { gpr_atm num_queue_items; } grpc_cq_event_queue; -/* TODO: sreek Refactor this based on the completion_type. Put completion-type - * specific data in a different structure (and co-allocate memory for it along - * with completion queue + pollset )*/ -typedef struct cq_data { - gpr_mu *mu; +typedef struct cq_next_data { + /** Completed events for completion-queues of type GRPC_CQ_NEXT */ + grpc_cq_event_queue queue; + /** Counter of how many things have ever been queued on this completion queue + useful for avoiding locks to check the queue */ + gpr_atm things_queued_ever; + + /* Number of outstanding events (+1 if not shut down) */ + gpr_atm pending_events; + + int shutdown_called; +} cq_next_data; + +typedef struct cq_pluck_data { /** Completed events for completion-queues of type GRPC_CQ_PLUCK */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; - /** Completed events for completion-queues of type GRPC_CQ_NEXT */ - grpc_cq_event_queue queue; - /** Number of pending events (+1 if we're not shutdown) */ gpr_refcount pending_events; - /** Once owning_refs drops to zero, we will destroy the cq */ - gpr_refcount owning_refs; - /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ gpr_atm things_queued_ever; @@ -245,37 +251,45 @@ typedef struct cq_data { gpr_atm shutdown; int shutdown_called; - int is_server_cq; - int num_pluckers; - int num_polls; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; - grpc_closure pollset_shutdown_done; +} cq_pluck_data; + +/* Completion queue structure */ +struct grpc_completion_queue { + /** Once owning_refs drops to zero, we will destroy the cq */ + gpr_refcount owning_refs; + + gpr_mu *mu; + + const cq_vtable *vtable; + const cq_poller_vtable *poller_vtable; #ifndef NDEBUG void **outstanding_tags; size_t outstanding_tag_count; size_t outstanding_tag_capacity; #endif -} cq_data; -/* Completion queue structure */ -struct grpc_completion_queue { - cq_data data; - const cq_vtable *vtable; - const cq_poller_vtable *poller_vtable; + grpc_closure pollset_shutdown_done; + int num_polls; }; /* Forward declarations */ -static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc); - -static size_t cq_size(grpc_completion_queue *cc); - -static void cq_begin_op(grpc_completion_queue *cc, void *tag); +static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); +static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); +static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); +static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); + +static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); +static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, @@ -283,39 +297,51 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage); static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage); -static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, +static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved); -static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, +static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved); +static void cq_init_next(void *data); +static void cq_init_pluck(void *data); +static void cq_destroy_next(void *data); +static void cq_destroy_pluck(void *data); + /* Completion queue vtables based on the completion-type */ static const cq_vtable g_cq_vtable[] = { /* GRPC_CQ_NEXT */ - {.cq_completion_type = GRPC_CQ_NEXT, - .size = cq_size, - .begin_op = cq_begin_op, + {.data_size = sizeof(cq_next_data), + .cq_completion_type = GRPC_CQ_NEXT, + .init = cq_init_next, + .shutdown = cq_shutdown_next, + .destroy = cq_destroy_next, + .begin_op = cq_begin_op_for_next, .end_op = cq_end_op_for_next, .next = cq_next, .pluck = NULL}, /* GRPC_CQ_PLUCK */ - {.cq_completion_type = GRPC_CQ_PLUCK, - .size = cq_size, - .begin_op = cq_begin_op, + {.data_size = sizeof(cq_pluck_data), + .cq_completion_type = GRPC_CQ_PLUCK, + .init = cq_init_pluck, + .shutdown = cq_shutdown_pluck, + .destroy = cq_destroy_pluck, + .begin_op = cq_begin_op_for_pluck, .end_op = cq_end_op_for_pluck, .next = NULL, .pluck = cq_pluck}, }; -#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) -#define CQ_FROM_POLLSET(ps) (((grpc_completion_queue *)ps) - 1) +#define DATA_FROM_CQ(cq) ((void *)(cq + 1)) +#define POLLSET_FROM_CQ(cq) \ + ((grpc_pollset *)(cq->vtable->data_size + (char *)DATA_FROM_CQ(cq))) grpc_tracer_flag grpc_cq_pluck_trace = GRPC_TRACER_INITIALIZER(true); grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true); @@ -329,7 +355,7 @@ grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true); gpr_free(_ev); \ } -static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, +static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq, grpc_error *error); static void cq_event_queue_init(grpc_cq_event_queue *q) { @@ -342,9 +368,9 @@ static void cq_event_queue_destroy(grpc_cq_event_queue *q) { gpr_mpscq_destroy(&q->queue); } -static void cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { +static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { gpr_mpscq_push(&q->queue, (gpr_mpscq_node *)c); - gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1); + return gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1) == 0; } static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) { @@ -367,16 +393,10 @@ static long cq_event_queue_num_items(grpc_cq_event_queue *q) { return (long)gpr_atm_no_barrier_load(&q->num_queue_items); } -static size_t cq_size(grpc_completion_queue *cc) { - /* Size of the completion queue and the size of the pollset whose memory is - allocated right after that of completion queue */ - return sizeof(grpc_completion_queue) + cc->poller_vtable->size(); -} - grpc_completion_queue *grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type) { - grpc_completion_queue *cc; + grpc_completion_queue *cq; GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0); @@ -389,158 +409,173 @@ grpc_completion_queue *grpc_completion_queue_create_internal( const cq_poller_vtable *poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; - cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size()); - cq_data *cqd = &cc->data; + cq = gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size + + poller_vtable->size()); - cc->vtable = vtable; - cc->poller_vtable = poller_vtable; + cq->vtable = vtable; + cq->poller_vtable = poller_vtable; - poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->data.mu); + /* One for destroy(), one for pollset_shutdown */ + gpr_ref_init(&cq->owning_refs, 2); -#ifndef NDEBUG - cqd->outstanding_tags = NULL; - cqd->outstanding_tag_capacity = 0; -#endif + poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu); + vtable->init(DATA_FROM_CQ(cq)); + + GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq, + grpc_schedule_on_exec_ctx); + + GPR_TIMER_END("grpc_completion_queue_create_internal", 0); + + return cq; +} +static void cq_init_next(void *ptr) { + cq_next_data *cqd = ptr; + /* Initial ref is dropped by grpc_completion_queue_shutdown */ + gpr_atm_no_barrier_store(&cqd->pending_events, 1); + cqd->shutdown_called = false; + gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); + cq_event_queue_init(&cqd->queue); +} + +static void cq_destroy_next(void *ptr) { + cq_next_data *cqd = ptr; + GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0); + cq_event_queue_destroy(&cqd->queue); +} + +static void cq_init_pluck(void *ptr) { + cq_pluck_data *cqd = ptr; /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cqd->pending_events, 1); - /* One for destroy(), one for pollset_shutdown */ - gpr_ref_init(&cqd->owning_refs, 2); cqd->completed_tail = &cqd->completed_head; cqd->completed_head.next = (uintptr_t)cqd->completed_tail; gpr_atm_no_barrier_store(&cqd->shutdown, 0); cqd->shutdown_called = 0; - cqd->is_server_cq = 0; cqd->num_pluckers = 0; - cqd->num_polls = 0; gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); -#ifndef NDEBUG - cqd->outstanding_tag_count = 0; -#endif - cq_event_queue_init(&cqd->queue); - GRPC_CLOSURE_INIT(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cc, - grpc_schedule_on_exec_ctx); - - GPR_TIMER_END("grpc_completion_queue_create_internal", 0); +} - return cc; +static void cq_destroy_pluck(void *ptr) { + cq_pluck_data *cqd = ptr; + GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); } -grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) { - return cc->vtable->cq_completion_type; +grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cq) { + return cq->vtable->cq_completion_type; } -int grpc_get_cq_poll_num(grpc_completion_queue *cc) { +int grpc_get_cq_poll_num(grpc_completion_queue *cq) { int cur_num_polls; - gpr_mu_lock(cc->data.mu); - cur_num_polls = cc->data.num_polls; - gpr_mu_unlock(cc->data.mu); + gpr_mu_lock(cq->mu); + cur_num_polls = cq->num_polls; + gpr_mu_unlock(cq->mu); return cur_num_polls; } #ifndef NDEBUG -void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, +void grpc_cq_internal_ref(grpc_completion_queue *cq, const char *reason, const char *file, int line) { - cq_data *cqd = &cc->data; if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) { - gpr_atm val = gpr_atm_no_barrier_load(&cqd->owning_refs.count); + gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cc, val, val + 1, + "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val + 1, reason); } #else -void grpc_cq_internal_ref(grpc_completion_queue *cc) { - cq_data *cqd = &cc->data; +void grpc_cq_internal_ref(grpc_completion_queue *cq) { #endif - gpr_ref(&cqd->owning_refs); + gpr_ref(&cq->owning_refs); } static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_completion_queue *cc = arg; - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cc, "pollset_destroy"); + grpc_completion_queue *cq = arg; + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy"); } #ifndef NDEBUG -void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, +void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, const char *reason, const char *file, int line) { - cq_data *cqd = &cc->data; if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) { - gpr_atm val = gpr_atm_no_barrier_load(&cqd->owning_refs.count); + gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cc, val, val - 1, + "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val - 1, reason); } #else void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc) { - cq_data *cqd = &cc->data; + grpc_completion_queue *cq) { #endif - if (gpr_unref(&cqd->owning_refs)) { - GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); - cc->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cc)); - cq_event_queue_destroy(&cqd->queue); + if (gpr_unref(&cq->owning_refs)) { + cq->vtable->destroy(DATA_FROM_CQ(cq)); + cq->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cq)); #ifndef NDEBUG - gpr_free(cqd->outstanding_tags); + gpr_free(cq->outstanding_tags); #endif - gpr_free(cc); + gpr_free(cq); } } -static void cq_begin_op(grpc_completion_queue *cc, void *tag) { - cq_data *cqd = &cc->data; -#ifndef NDEBUG - gpr_mu_lock(cqd->mu); +static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + GPR_ASSERT(!cqd->shutdown_called); + gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1); +} + +static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(!cqd->shutdown_called); - if (cqd->outstanding_tag_count == cqd->outstanding_tag_capacity) { - cqd->outstanding_tag_capacity = - GPR_MAX(4, 2 * cqd->outstanding_tag_capacity); - cqd->outstanding_tags = - gpr_realloc(cqd->outstanding_tags, sizeof(*cqd->outstanding_tags) * - cqd->outstanding_tag_capacity); - } - cqd->outstanding_tags[cqd->outstanding_tag_count++] = tag; - gpr_mu_unlock(cqd->mu); -#endif gpr_ref(&cqd->pending_events); } -void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { - cc->vtable->begin_op(cc, tag); +void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { +#ifndef NDEBUG + gpr_mu_lock(cq->mu); + if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { + cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity); + cq->outstanding_tags = + gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) * + cq->outstanding_tag_capacity); + } + cq->outstanding_tags[cq->outstanding_tag_count++] = tag; + gpr_mu_unlock(cq->mu); +#endif + cq->vtable->begin_op(cq, tag); } #ifndef NDEBUG -static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) { - cq_data *cqd = &cc->data; +static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { int found = 0; if (lock_cq) { - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); } - for (int i = 0; i < (int)cqd->outstanding_tag_count; i++) { - if (cqd->outstanding_tags[i] == tag) { - cqd->outstanding_tag_count--; - GPR_SWAP(void *, cqd->outstanding_tags[i], - cqd->outstanding_tags[cqd->outstanding_tag_count]); + for (int i = 0; i < (int)cq->outstanding_tag_count; i++) { + if (cq->outstanding_tags[i] == tag) { + cq->outstanding_tag_count--; + GPR_SWAP(void *, cq->outstanding_tags[i], + cq->outstanding_tags[cq->outstanding_tag_count]); found = 1; break; } } if (lock_cq) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); } GPR_ASSERT(found); } #else -static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {} +static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} #endif -/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a + * completion * type of GRPC_CQ_NEXT) */ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, @@ -553,16 +588,16 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, error != GRPC_ERROR_NONE)) { const char *errmsg = grpc_error_string(error); GRPC_API_TRACE( - "cq_end_op_for_next(exec_ctx=%p, cc=%p, tag=%p, error=%s, " + "cq_end_op_for_next(exec_ctx=%p, cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); + 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage)); if (GRPC_TRACER_ON(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); } } - cq_data *cqd = &cc->data; + cq_next_data *cqd = DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); storage->tag = tag; @@ -570,28 +605,42 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, storage->done_arg = done_arg; storage->next = (uintptr_t)(is_success); - cq_check_tag(cc, tag, true); /* Used in debug builds only */ + cq_check_tag(cq, tag, true); /* Used in debug builds only */ /* Add the completion to the queue */ - cq_event_queue_push(&cqd->queue, storage); + bool is_first = cq_event_queue_push(&cqd->queue, storage); gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - - gpr_mu_lock(cqd->mu); - - int shutdown = gpr_unref(&cqd->pending_events); - if (!shutdown) { - grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), NULL); - gpr_mu_unlock(cqd->mu); - - if (kick_error != GRPC_ERROR_NONE) { - const char *msg = grpc_error_string(kick_error); - gpr_log(GPR_ERROR, "Kick failed: %s", msg); - - GRPC_ERROR_UNREF(kick_error); + bool will_definitely_shutdown = + gpr_atm_no_barrier_load(&cqd->pending_events) == 1; + + if (!will_definitely_shutdown) { + /* Only kick if this is the first item queued */ + if (is_first) { + gpr_mu_lock(cq->mu); + grpc_error *kick_error = + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); + gpr_mu_unlock(cq->mu); + + if (kick_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(kick_error); + gpr_log(GPR_ERROR, "Kick failed: %s", msg); + GRPC_ERROR_UNREF(kick_error); + } + } + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } } else { - cq_finish_shutdown(exec_ctx, cc); - gpr_mu_unlock(cqd->mu); + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_atm_rel_store(&cqd->pending_events, 0); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } GPR_TIMER_END("cq_end_op_for_next", 0); @@ -599,16 +648,17 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a + * completion * type of GRPC_CQ_PLUCK) */ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0); @@ -618,9 +668,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, error != GRPC_ERROR_NONE)) { const char *errmsg = grpc_error_string(error); GRPC_API_TRACE( - "cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, " + "cq_end_op_for_pluck(exec_ctx=%p, cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); + 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage)); if (GRPC_TRACER_ON(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); @@ -632,8 +682,8 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, storage->done_arg = done_arg; storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success)); - gpr_mu_lock(cqd->mu); - cq_check_tag(cc, tag, false); /* Used in debug builds only */ + gpr_mu_lock(cq->mu); + cq_check_tag(cq, tag, false); /* Used in debug builds only */ /* Add to the list of completions */ gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); @@ -652,9 +702,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, } grpc_error *kick_error = - cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker); + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); if (kick_error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(kick_error); @@ -663,8 +713,8 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(kick_error); } } else { - cq_finish_shutdown(exec_ctx, cc); - gpr_mu_unlock(cqd->mu); + cq_finish_shutdown_pluck(exec_ctx, cq); + gpr_mu_unlock(cq->mu); } GPR_TIMER_END("cq_end_op_for_pluck", 0); @@ -672,12 +722,12 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, +void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { - cc->vtable->end_op(exec_ctx, cc, tag, error, done, done_arg, storage); + cq->vtable->end_op(exec_ctx, cq, tag, error, done, done_arg, storage); } typedef struct { @@ -692,7 +742,7 @@ typedef struct { static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; - cq_data *cqd = &cq->data; + cq_next_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = @@ -703,7 +753,8 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { gpr_atm_no_barrier_load(&cqd->things_queued_ever); /* Pop a cq_completion from the queue. Returns NULL if the queue is empty - * might return NULL in some cases even if the queue is not empty; but that + * might return NULL in some cases even if the queue is not empty; but + * that * is ok and doesn't affect correctness. Might effect the tail latencies a * bit) */ a->stolen_completion = cq_event_queue_pop(&cqd->queue); @@ -716,58 +767,56 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { } #ifndef NDEBUG -static void dump_pending_tags(grpc_completion_queue *cc) { +static void dump_pending_tags(grpc_completion_queue *cq) { if (!GRPC_TRACER_ON(grpc_trace_pending_tags)) return; - cq_data *cqd = &cc->data; - gpr_strvec v; gpr_strvec_init(&v); gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:")); - gpr_mu_lock(cqd->mu); - for (size_t i = 0; i < cqd->outstanding_tag_count; i++) { + gpr_mu_lock(cq->mu); + for (size_t i = 0; i < cq->outstanding_tag_count; i++) { char *s; - gpr_asprintf(&s, " %p", cqd->outstanding_tags[i]); + gpr_asprintf(&s, " %p", cq->outstanding_tags[i]); gpr_strvec_add(&v, s); } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); char *out = gpr_strvec_flatten(&v, NULL); gpr_strvec_destroy(&v); gpr_log(GPR_DEBUG, "%s", out); gpr_free(out); } #else -static void dump_pending_tags(grpc_completion_queue *cc) {} +static void dump_pending_tags(grpc_completion_queue *cq) {} #endif -static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, +static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { grpc_event ret; gpr_timespec now; - cq_data *cqd = &cc->data; + cq_next_data *cqd = DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); GRPC_API_TRACE( "grpc_completion_queue_next(" - "cc=%p, " + "cq=%p, " "deadline=gpr_timespec { tv_sec: %" PRId64 ", tv_nsec: %d, clock_type: %d }, " "reserved=%p)", - 5, (cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, + 5, (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, reserved)); GPR_ASSERT(!reserved); - dump_pending_tags(cc); + dump_pending_tags(cq); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - GRPC_CQ_INTERNAL_REF(cc, "next"); + GRPC_CQ_INTERNAL_REF(cq, "next"); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever), - .cq = cc, + .cq = cq, .deadline = deadline, .stolen_completion = NULL, .tag = NULL, @@ -800,21 +849,24 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, /* If c == NULL it means either the queue is empty OR in an transient inconsistent state. If it is the latter, we shold do a 0-timeout poll so that the thread comes back quickly from poll to make a second - attempt at popping. Not doing this can potentially deadlock this thread + attempt at popping. Not doing this can potentially deadlock this + thread forever (if the deadline is infinity) */ if (cq_event_queue_num_items(&cqd->queue) > 0) { iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); } } - if (gpr_atm_no_barrier_load(&cqd->shutdown)) { + if (gpr_atm_no_barrier_load(&cqd->pending_events) == 0) { /* Before returning, check if the queue has any items left over (since gpr_mpscq_pop() can sometimes return NULL even if the queue is not empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ if (cq_event_queue_num_items(&cqd->queue) > 0) { /* Go to the beginning of the loop. No point doing a poll because - (cc->shutdown == true) is only possible when there is no pending work - (i.e cc->pending_events == 0) and any outstanding grpc_cq_completion + (cq->shutdown == true) is only possible when there is no pending + work + (i.e cq->pending_events == 0) and any outstanding + grpc_cq_completion events are already queued on this cq */ continue; } @@ -828,16 +880,16 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } /* The main polling work happens in grpc_pollset_work */ - gpr_mu_lock(cqd->mu); - cqd->num_polls++; - grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), + gpr_mu_lock(cq->mu); + cq->num_polls++; + grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), NULL, now, iteration_deadline); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); if (err != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(err); @@ -846,30 +898,74 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, GRPC_ERROR_UNREF(err); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } is_finished_arg.first_loop = false; } - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "next"); + GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next"); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(is_finished_arg.stolen_completion == NULL); + if (cq_event_queue_num_items(&cqd->queue) > 0 && + gpr_atm_no_barrier_load(&cqd->pending_events) > 0) { + gpr_mu_lock(cq->mu); + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); + gpr_mu_unlock(cq->mu); + } + GPR_TIMER_END("grpc_completion_queue_next", 0); return ret; } -grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, +/* Finishes the completion queue shutdown. This means that there are no more + completion events / tags expected from the completion queue + - Must be called under completion queue lock + - Must be called only once in completion queue's lifetime + - grpc_completion_queue_shutdown() MUST have been called before calling + this function */ +static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + + GPR_ASSERT(cqd->shutdown_called); + GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0); + + cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), + &cq->pollset_shutdown_done); +} + +static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + if (cqd->shutdown_called) { + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + GPR_TIMER_END("grpc_completion_queue_shutdown", 0); + return; + } + cqd->shutdown_called = 1; + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + cq_finish_shutdown_next(exec_ctx, cq); + } + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); +} + +grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { - return cc->vtable->next(cc, deadline, reserved); + return cq->vtable->next(cq, deadline, reserved); } -static int add_plucker(grpc_completion_queue *cc, void *tag, +static int add_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker) { - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { return 0; } @@ -879,9 +975,9 @@ static int add_plucker(grpc_completion_queue *cc, void *tag, return 1; } -static void del_plucker(grpc_completion_queue *cc, void *tag, +static void del_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker) { - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); for (int i = 0; i < cqd->num_pluckers; i++) { if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) { cqd->num_pluckers--; @@ -895,13 +991,13 @@ static void del_plucker(grpc_completion_queue *cc, void *tag, static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; - cq_data *cqd = &cq->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever); if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); a->last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever); grpc_cq_completion *c; @@ -913,51 +1009,51 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { if (c == cqd->completed_tail) { cqd->completed_tail = prev; } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); a->stolen_completion = c; return true; } prev = c; } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); } return !a->first_loop && gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; } -static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, +static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved) { grpc_event ret; grpc_cq_completion *c; grpc_cq_completion *prev; grpc_pollset_worker *worker = NULL; gpr_timespec now; - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); if (GRPC_TRACER_ON(grpc_cq_pluck_trace)) { GRPC_API_TRACE( "grpc_completion_queue_pluck(" - "cc=%p, tag=%p, " + "cq=%p, tag=%p, " "deadline=gpr_timespec { tv_sec: %" PRId64 ", tv_nsec: %d, clock_type: %d }, " "reserved=%p)", - 6, (cc, tag, deadline.tv_sec, deadline.tv_nsec, + 6, (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, reserved)); } GPR_ASSERT(!reserved); - dump_pending_tags(cc); + dump_pending_tags(cq); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - GRPC_CQ_INTERNAL_REF(cc, "pluck"); - gpr_mu_lock(cqd->mu); + GRPC_CQ_INTERNAL_REF(cq, "pluck"); + gpr_mu_lock(cq->mu); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever), - .cq = cc, + .cq = cq, .deadline = deadline, .stolen_completion = NULL, .tag = tag, @@ -966,7 +1062,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg); for (;;) { if (is_finished_arg.stolen_completion != NULL) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); c = is_finished_arg.stolen_completion; is_finished_arg.stolen_completion = NULL; ret.type = GRPC_OP_COMPLETE; @@ -983,7 +1079,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, if (c == cqd->completed_tail) { cqd->completed_tail = prev; } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -993,54 +1089,54 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, prev = c; } if (gpr_atm_no_barrier_load(&cqd->shutdown)) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; } - if (!add_plucker(cc, tag, &worker)) { + if (!add_plucker(cq, tag, &worker)) { gpr_log(GPR_DEBUG, "Too many outstanding grpc_completion_queue_pluck calls: maximum " "is %d", GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); /* TODO(ctiller): should we use a different result here */ ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } now = gpr_now(GPR_CLOCK_MONOTONIC); if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { - del_plucker(cc, tag, &worker); - gpr_mu_unlock(cqd->mu); + del_plucker(cq, tag, &worker); + gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } - cqd->num_polls++; - grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), + cq->num_polls++; + grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), &worker, now, deadline); if (err != GRPC_ERROR_NONE) { - del_plucker(cc, tag, &worker); - gpr_mu_unlock(cqd->mu); + del_plucker(cq, tag, &worker); + gpr_mu_unlock(cq->mu); const char *msg = grpc_error_string(err); gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg); GRPC_ERROR_UNREF(err); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } is_finished_arg.first_loop = false; - del_plucker(cc, tag, &worker); + del_plucker(cq, tag, &worker); } done: - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "pluck"); + GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "pluck"); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(is_finished_arg.stolen_completion == NULL); @@ -1049,85 +1145,66 @@ done: return ret; } -grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, +grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved) { - return cc->vtable->pluck(cc, tag, deadline, reserved); + return cq->vtable->pluck(cq, tag, deadline, reserved); } -/* Finishes the completion queue shutdown. This means that there are no more - completion events / tags expected from the completion queue - - Must be called under completion queue lock - - Must be called only once in completion queue's lifetime - - grpc_completion_queue_shutdown() MUST have been called before calling - this function */ -static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc) { - cq_data *cqd = &cc->data; +static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); gpr_atm_no_barrier_store(&cqd->shutdown, 1); - cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), - &cqd->pollset_shutdown_done); + cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), + &cq->pollset_shutdown_done); } -/* Shutdown simply drops a ref that we reserved at creation time; if we drop - to zero here, then enter shutdown mode and wake up any waiters */ -void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); - GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc)); - cq_data *cqd = &cc->data; +static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); if (cqd->shutdown_called) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } cqd->shutdown_called = 1; if (gpr_unref(&cqd->pending_events)) { - cq_finish_shutdown(&exec_ctx, cc); + cq_finish_shutdown_pluck(exec_ctx, cq); } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); +} + +/* Shutdown simply drops a ref that we reserved at creation time; if we drop + to zero here, then enter shutdown mode and wake up any waiters */ +void grpc_completion_queue_shutdown(grpc_completion_queue *cq) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); + GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq)); + cq->vtable->shutdown(&exec_ctx, cq); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } -void grpc_completion_queue_destroy(grpc_completion_queue *cc) { - GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc)); +void grpc_completion_queue_destroy(grpc_completion_queue *cq) { + GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq)); GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); - grpc_completion_queue_shutdown(cc); - - /* TODO (sreek): This should not ideally be here. Refactor it into the - * cq_vtable (perhaps have a create/destroy methods in the cq vtable) */ - if (cc->vtable->cq_completion_type == GRPC_CQ_NEXT) { - GPR_ASSERT(cq_event_queue_num_items(&cc->data.queue) == 0); - } + grpc_completion_queue_shutdown(cq); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "destroy"); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "destroy"); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_destroy", 0); } -grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { - return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL; -} - -grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { - return CQ_FROM_POLLSET(ps); -} - -void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { - cc->data.is_server_cq = 1; -} - -bool grpc_cq_is_server_cq(grpc_completion_queue *cc) { - return cc->data.is_server_cq; +grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cq) { + return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : NULL; } -bool grpc_cq_can_listen(grpc_completion_queue *cc) { - return cc->poller_vtable->can_listen; +bool grpc_cq_can_listen(grpc_completion_queue *cq) { + return cq->poller_vtable->can_listen; } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 97ea9cae20..af44482513 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -84,10 +84,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *done_arg, grpc_cq_completion *storage); grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); -grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps); -void grpc_cq_mark_server_cq(grpc_completion_queue *cc); -bool grpc_cq_is_server_cq(grpc_completion_queue *cc); bool grpc_cq_can_listen(grpc_completion_queue *cc); grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 84ddf74ab9..de0a91e2b2 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -32,8 +32,7 @@ #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/support/mpscq.h" -#include "src/core/lib/support/spinlock.h" +#include "src/core/lib/support/stack_lockfree.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -62,7 +61,6 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false); typedef struct requested_call { - gpr_mpscq_node request_link; /* must be first */ requested_call_type type; size_t cq_idx; void *tag; @@ -162,7 +160,7 @@ struct request_matcher { grpc_server *server; call_data *pending_head; call_data *pending_tail; - gpr_locked_mpscq *requests_per_cq; + gpr_stack_lockfree **requests_per_cq; }; struct registered_method { @@ -207,6 +205,11 @@ struct grpc_server { registered_method *registered_methods; /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; + /** free list of available requested_calls_per_cq indices */ + gpr_stack_lockfree **request_freelist_per_cq; + /** requested call backing data */ + requested_call **requested_calls_per_cq; + int max_requested_calls_per_cq; gpr_atm shutdown_flag; uint8_t shutdown_published; @@ -306,20 +309,21 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, * request_matcher */ -static void request_matcher_init(request_matcher *rm, grpc_server *server) { +static void request_matcher_init(request_matcher *rm, size_t entries, + grpc_server *server) { memset(rm, 0, sizeof(*rm)); rm->server = server; rm->requests_per_cq = gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count); for (size_t i = 0; i < server->cq_count; i++) { - gpr_locked_mpscq_init(&rm->requests_per_cq[i]); + rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries); } } static void request_matcher_destroy(request_matcher *rm) { for (size_t i = 0; i < rm->server->cq_count; i++) { - GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL); - gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]); + GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1); + gpr_stack_lockfree_destroy(rm->requests_per_cq[i]); } gpr_free(rm->requests_per_cq); } @@ -349,17 +353,13 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, grpc_server *server, request_matcher *rm, grpc_error *error) { - requested_call *rc; + int request_id; for (size_t i = 0; i < server->cq_count; i++) { - /* Here we know: - 1. no requests are being added (since the server is shut down) - 2. no other threads are pulling (since the shut down process is single - threaded) - So, we can ignore the queue lock and just pop, with the guarantee that a - NULL returned here truly means that the queue is empty */ - while ((rc = (requested_call *)gpr_mpscq_pop( - &rm->requests_per_cq[i].queue)) != NULL) { - fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error)); + while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != + -1) { + fail_call(exec_ctx, server, i, + &server->requested_calls_per_cq[i][request_id], + GRPC_ERROR_REF(error)); } } GRPC_ERROR_UNREF(error); @@ -394,7 +394,13 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server"); + if (server->started) { + gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]); + gpr_free(server->requested_calls_per_cq[i]); + } } + gpr_free(server->request_freelist_per_cq); + gpr_free(server->requested_calls_per_cq); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -452,7 +458,21 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, grpc_cq_completion *c) { - gpr_free(req); + requested_call *rc = req; + grpc_server *server = rc->server; + + if (rc >= server->requested_calls_per_cq[rc->cq_idx] && + rc < server->requested_calls_per_cq[rc->cq_idx] + + server->max_requested_calls_per_cq) { + GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX); + gpr_stack_lockfree_push( + server->request_freelist_per_cq[rc->cq_idx], + (int)(rc - server->requested_calls_per_cq[rc->cq_idx])); + } else { + gpr_free(req); + } + + server_unref(exec_ctx, server); } static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, @@ -482,6 +502,10 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, GPR_UNREACHABLE_CODE(return ); } + grpc_call_element *elem = + grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + channel_data *chand = elem->channel_data; + server_ref(chand->server); grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, rc, &rc->completion); } @@ -509,15 +533,15 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, for (size_t i = 0; i < server->cq_count; i++) { size_t cq_idx = (chand->cq_idx + i) % server->cq_count; - requested_call *rc = - (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); - if (rc == NULL) { + int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); + if (request_id == -1) { continue; } else { gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, rc); + publish_call(exec_ctx, server, calld, cq_idx, + &server->requested_calls_per_cq[cq_idx][request_id]); return; /* early out */ } } @@ -951,8 +975,6 @@ static void register_completion_queue(grpc_server *server, if (server->cqs[i] == cq) return; } - grpc_cq_mark_server_cq(cq); - GRPC_CQ_INTERNAL_REF(cq, "server"); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, @@ -992,6 +1014,8 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; + /* TODO(ctiller): expose a channel_arg for this */ + server->max_requested_calls_per_cq = 32768; server->channel_args = grpc_channel_args_copy(args); return server; @@ -1064,15 +1088,29 @@ void grpc_server_start(grpc_server *server) { server->started = true; server->pollset_count = 0; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); + server->request_freelist_per_cq = + gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count); + server->requested_calls_per_cq = + gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { if (grpc_cq_can_listen(server->cqs[i])) { server->pollsets[server->pollset_count++] = grpc_cq_pollset(server->cqs[i]); } + server->request_freelist_per_cq[i] = + gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq); + for (int j = 0; j < server->max_requested_calls_per_cq; j++) { + gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j); + } + server->requested_calls_per_cq[i] = + gpr_malloc((size_t)server->max_requested_calls_per_cq * + sizeof(*server->requested_calls_per_cq[i])); } - request_matcher_init(&server->unregistered_request_matcher, server); + request_matcher_init(&server->unregistered_request_matcher, + (size_t)server->max_requested_calls_per_cq, server); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_init(&rm->request_matcher, server); + request_matcher_init(&rm->request_matcher, + (size_t)server->max_requested_calls_per_cq, server); } server_ref(server); @@ -1116,9 +1154,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, chand->channel = channel; size_t cq_idx; - grpc_completion_queue *accepting_cq = grpc_cq_from_pollset(accepting_pollset); for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) { - if (s->cqs[cq_idx] == accepting_cq) break; + if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break; } if (cq_idx == s->cq_count) { /* completion queue not found: pick a random one to publish new calls to */ @@ -1326,11 +1363,21 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, requested_call *rc) { call_data *calld = NULL; request_matcher *rm = NULL; + int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { fail_call(exec_ctx, server, cq_idx, rc, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); return GRPC_CALL_OK; } + request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]); + if (request_id == -1) { + /* out of request ids: just fail this one */ + fail_call(exec_ctx, server, cq_idx, rc, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"), + GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq)); + return GRPC_CALL_OK; + } switch (rc->type) { case BATCH_CALL: rm = &server->unregistered_request_matcher; @@ -1339,13 +1386,15 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, rm = &rc->data.registered.registered_method->request_matcher; break; } - if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) { + server->requested_calls_per_cq[cq_idx][request_id] = *rc; + gpr_free(rc); + if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { /* this was the first queued request: we need to lock and start matching calls */ gpr_mu_lock(&server->mu_call); while ((calld = rm->pending_head) != NULL) { - rc = (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); - if (rc == NULL) break; + request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); + if (request_id == -1) break; rm->pending_head = calld->pending_next; gpr_mu_unlock(&server->mu_call); gpr_mu_lock(&calld->mu_state); @@ -1361,7 +1410,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, rc); + publish_call(exec_ctx, server, calld, cq_idx, + &server->requested_calls_per_cq[cq_idx][request_id]); } gpr_mu_lock(&server->mu_call); } @@ -1468,6 +1518,7 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, rc->initial_metadata->count = 0; GPR_ASSERT(error != GRPC_ERROR_NONE); + server_ref(server); grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error, done_request_event, rc, &rc->completion); } diff --git a/src/core/lib/transport/static_metadata.c b/src/core/lib/transport/static_metadata.c index 404c240589..2388f19f81 100644 --- a/src/core/lib/transport/static_metadata.c +++ b/src/core/lib/transport/static_metadata.c @@ -464,7 +464,8 @@ grpc_mdelem grpc_static_mdelem_for_static_strings(int a, int b) { if (a == -1 || b == -1) return GRPC_MDNULL; uint32_t k = (uint32_t)(a * 99 + b); uint32_t h = elems_phash(k); - return h < GPR_ARRAY_SIZE(elem_keys) && elem_keys[h] == k + return h < GPR_ARRAY_SIZE(elem_keys) && elem_keys[h] == k && + elem_idxs[h] != 255 ? GRPC_MAKE_MDELEM(&grpc_static_mdelem_table[elem_idxs[h]], GRPC_MDELEM_STORAGE_STATIC) : GRPC_MDNULL; diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c index 1e919c4cce..1280680663 100644 --- a/src/core/tsi/fake_transport_security.c +++ b/src/core/tsi/fake_transport_security.c @@ -31,6 +31,7 @@ #define TSI_FAKE_FRAME_HEADER_SIZE 4 #define TSI_FAKE_FRAME_INITIAL_ALLOCATED_SIZE 64 #define TSI_FAKE_DEFAULT_FRAME_SIZE 16384 +#define TSI_FAKE_HANDSHAKER_OUTGOING_BUFFER_INITIAL_SIZE 256 /* --- Structure definitions. ---*/ @@ -59,8 +60,10 @@ typedef struct { int is_client; tsi_fake_handshake_message next_message_to_send; int needs_incoming_message; - tsi_fake_frame incoming; - tsi_fake_frame outgoing; + tsi_fake_frame incoming_frame; + tsi_fake_frame outgoing_frame; + unsigned char *outgoing_bytes_buffer; + size_t outgoing_bytes_buffer_size; tsi_result result; } tsi_fake_handshaker; @@ -116,27 +119,23 @@ static void tsi_fake_frame_reset(tsi_fake_frame *frame, int needs_draining) { if (!needs_draining) frame->size = 0; } -/* Returns 1 if successful, 0 otherwise. */ -static int tsi_fake_frame_ensure_size(tsi_fake_frame *frame) { +/* Checks if the frame's allocated size is at least frame->size, and reallocs + * more memory if necessary. */ +static void tsi_fake_frame_ensure_size(tsi_fake_frame *frame) { if (frame->data == NULL) { frame->allocated_size = frame->size; frame->data = gpr_malloc(frame->allocated_size); - if (frame->data == NULL) return 0; } else if (frame->size > frame->allocated_size) { unsigned char *new_data = gpr_realloc(frame->data, frame->size); - if (new_data == NULL) { - gpr_free(frame->data); - frame->data = NULL; - return 0; - } frame->data = new_data; frame->allocated_size = frame->size; } - return 1; } -/* This method should not be called if frame->needs_framing is not 0. */ -static tsi_result fill_frame_from_bytes(const unsigned char *incoming_bytes, +/* Decodes the serialized fake frame contained in incoming_bytes, and fills + * frame with the contents of the decoded frame. + * This method should not be called if frame->needs_framing is not 0. */ +static tsi_result tsi_fake_frame_decode(const unsigned char *incoming_bytes, size_t *incoming_bytes_size, tsi_fake_frame *frame) { size_t available_size = *incoming_bytes_size; @@ -147,7 +146,6 @@ static tsi_result fill_frame_from_bytes(const unsigned char *incoming_bytes, if (frame->data == NULL) { frame->allocated_size = TSI_FAKE_FRAME_INITIAL_ALLOCATED_SIZE; frame->data = gpr_malloc(frame->allocated_size); - if (frame->data == NULL) return TSI_OUT_OF_RESOURCES; } if (frame->offset < TSI_FAKE_FRAME_HEADER_SIZE) { @@ -165,7 +163,7 @@ static tsi_result fill_frame_from_bytes(const unsigned char *incoming_bytes, frame->offset += to_read_size; available_size -= to_read_size; frame->size = load32_little_endian(frame->data); - if (!tsi_fake_frame_ensure_size(frame)) return TSI_OUT_OF_RESOURCES; + tsi_fake_frame_ensure_size(frame); } to_read_size = frame->size - frame->offset; @@ -183,10 +181,12 @@ static tsi_result fill_frame_from_bytes(const unsigned char *incoming_bytes, return TSI_OK; } -/* This method should not be called if frame->needs_framing is 0. */ -static tsi_result drain_frame_to_bytes(unsigned char *outgoing_bytes, - size_t *outgoing_bytes_size, - tsi_fake_frame *frame) { +/* Encodes a fake frame into its wire format and places the result in + * outgoing_bytes. outgoing_bytes_size indicates the size of the encoded frame. + * This method should not be called if frame->needs_framing is 0. */ +static tsi_result tsi_fake_frame_encode(unsigned char *outgoing_bytes, + size_t *outgoing_bytes_size, + tsi_fake_frame *frame) { size_t to_write_size = frame->size - frame->offset; if (!frame->needs_draining) return TSI_INTERNAL_ERROR; if (*outgoing_bytes_size < to_write_size) { @@ -200,17 +200,20 @@ static tsi_result drain_frame_to_bytes(unsigned char *outgoing_bytes, return TSI_OK; } -static tsi_result bytes_to_frame(unsigned char *bytes, size_t bytes_size, - tsi_fake_frame *frame) { +/* Sets the payload of a fake frame to contain the given data blob, where + * data_size indicates the size of data. */ +static tsi_result tsi_fake_frame_set_data(unsigned char *data, size_t data_size, + tsi_fake_frame *frame) { frame->offset = 0; - frame->size = bytes_size + TSI_FAKE_FRAME_HEADER_SIZE; - if (!tsi_fake_frame_ensure_size(frame)) return TSI_OUT_OF_RESOURCES; + frame->size = data_size + TSI_FAKE_FRAME_HEADER_SIZE; + tsi_fake_frame_ensure_size(frame); store32_little_endian((uint32_t)frame->size, frame->data); - memcpy(frame->data + TSI_FAKE_FRAME_HEADER_SIZE, bytes, bytes_size); + memcpy(frame->data + TSI_FAKE_FRAME_HEADER_SIZE, data, data_size); tsi_fake_frame_reset(frame, 1 /* needs draining */); return TSI_OK; } +/* Destroys the contents of a fake frame. */ static void tsi_fake_frame_destruct(tsi_fake_frame *frame) { if (frame->data != NULL) gpr_free(frame->data); } @@ -235,7 +238,7 @@ static tsi_result fake_protector_protect(tsi_frame_protector *self, if (frame->needs_draining) { drained_size = saved_output_size - *num_bytes_written; result = - drain_frame_to_bytes(protected_output_frames, &drained_size, frame); + tsi_fake_frame_encode(protected_output_frames, &drained_size, frame); *num_bytes_written += drained_size; protected_output_frames += drained_size; if (result != TSI_OK) { @@ -254,15 +257,15 @@ static tsi_result fake_protector_protect(tsi_frame_protector *self, size_t written_in_frame_size = 0; store32_little_endian((uint32_t)impl->max_frame_size, frame_header); written_in_frame_size = TSI_FAKE_FRAME_HEADER_SIZE; - result = fill_frame_from_bytes(frame_header, &written_in_frame_size, frame); + result = tsi_fake_frame_decode(frame_header, &written_in_frame_size, frame); if (result != TSI_INCOMPLETE_DATA) { - gpr_log(GPR_ERROR, "fill_frame_from_bytes returned %s", + gpr_log(GPR_ERROR, "tsi_fake_frame_decode returned %s", tsi_result_to_string(result)); return result; } } result = - fill_frame_from_bytes(unprotected_bytes, unprotected_bytes_size, frame); + tsi_fake_frame_decode(unprotected_bytes, unprotected_bytes_size, frame); if (result != TSI_OK) { if (result == TSI_INCOMPLETE_DATA) result = TSI_OK; return result; @@ -272,7 +275,7 @@ static tsi_result fake_protector_protect(tsi_frame_protector *self, if (!frame->needs_draining) return TSI_INTERNAL_ERROR; if (frame->offset != 0) return TSI_INTERNAL_ERROR; drained_size = saved_output_size - *num_bytes_written; - result = drain_frame_to_bytes(protected_output_frames, &drained_size, frame); + result = tsi_fake_frame_encode(protected_output_frames, &drained_size, frame); *num_bytes_written += drained_size; if (result == TSI_INCOMPLETE_DATA) result = TSI_OK; return result; @@ -292,8 +295,8 @@ static tsi_result fake_protector_protect_flush( store32_little_endian((uint32_t)frame->size, frame->data); /* Overwrite header. */ } - result = drain_frame_to_bytes(protected_output_frames, - protected_output_frames_size, frame); + result = tsi_fake_frame_encode(protected_output_frames, + protected_output_frames_size, frame); if (result == TSI_INCOMPLETE_DATA) result = TSI_OK; *still_pending_size = frame->size - frame->offset; return result; @@ -316,7 +319,7 @@ static tsi_result fake_protector_unprotect( /* Go past the header if needed. */ if (frame->offset == 0) frame->offset = TSI_FAKE_FRAME_HEADER_SIZE; drained_size = saved_output_size - *num_bytes_written; - result = drain_frame_to_bytes(unprotected_bytes, &drained_size, frame); + result = tsi_fake_frame_encode(unprotected_bytes, &drained_size, frame); unprotected_bytes += drained_size; *num_bytes_written += drained_size; if (result != TSI_OK) { @@ -330,7 +333,7 @@ static tsi_result fake_protector_unprotect( /* Now process the protected_bytes. */ if (frame->needs_draining) return TSI_INTERNAL_ERROR; - result = fill_frame_from_bytes(protected_frames_bytes, + result = tsi_fake_frame_decode(protected_frames_bytes, protected_frames_bytes_size, frame); if (result != TSI_OK) { if (result == TSI_INCOMPLETE_DATA) result = TSI_OK; @@ -342,7 +345,7 @@ static tsi_result fake_protector_unprotect( if (frame->offset != 0) return TSI_INTERNAL_ERROR; frame->offset = TSI_FAKE_FRAME_HEADER_SIZE; /* Go past the header. */ drained_size = saved_output_size - *num_bytes_written; - result = drain_frame_to_bytes(unprotected_bytes, &drained_size, frame); + result = tsi_fake_frame_encode(unprotected_bytes, &drained_size, frame); *num_bytes_written += drained_size; if (result == TSI_INCOMPLETE_DATA) result = TSI_OK; return result; @@ -360,6 +363,72 @@ static const tsi_frame_protector_vtable frame_protector_vtable = { fake_protector_unprotect, fake_protector_destroy, }; +/* --- tsi_handshaker_result methods implementation. ---*/ + +typedef struct { + tsi_handshaker_result base; + unsigned char *unused_bytes; + size_t unused_bytes_size; +} fake_handshaker_result; + +static tsi_result fake_handshaker_result_extract_peer( + const tsi_handshaker_result *self, tsi_peer *peer) { + /* Construct a tsi_peer with 1 property: certificate type. */ + tsi_result result = tsi_construct_peer(1, peer); + if (result != TSI_OK) return result; + result = tsi_construct_string_peer_property_from_cstring( + TSI_CERTIFICATE_TYPE_PEER_PROPERTY, TSI_FAKE_CERTIFICATE_TYPE, + &peer->properties[0]); + if (result != TSI_OK) tsi_peer_destruct(peer); + return result; +} + +static tsi_result fake_handshaker_result_create_frame_protector( + const tsi_handshaker_result *self, size_t *max_output_protected_frame_size, + tsi_frame_protector **protector) { + *protector = tsi_create_fake_frame_protector(max_output_protected_frame_size); + return TSI_OK; +} + +static tsi_result fake_handshaker_result_get_unused_bytes( + const tsi_handshaker_result *self, unsigned char **bytes, + size_t *bytes_size) { + fake_handshaker_result *result = (fake_handshaker_result *)self; + *bytes_size = result->unused_bytes_size; + *bytes = result->unused_bytes; + return TSI_OK; +} + +static void fake_handshaker_result_destroy(tsi_handshaker_result *self) { + fake_handshaker_result *result = (fake_handshaker_result *)self; + gpr_free(result->unused_bytes); + gpr_free(self); +} + +static const tsi_handshaker_result_vtable handshaker_result_vtable = { + fake_handshaker_result_extract_peer, + fake_handshaker_result_create_frame_protector, + fake_handshaker_result_get_unused_bytes, fake_handshaker_result_destroy, +}; + +static tsi_result fake_handshaker_result_create( + const unsigned char *unused_bytes, size_t unused_bytes_size, + tsi_handshaker_result **handshaker_result) { + if ((unused_bytes_size > 0 && unused_bytes == NULL) || + handshaker_result == NULL) { + return TSI_INVALID_ARGUMENT; + } + fake_handshaker_result *result = gpr_zalloc(sizeof(*result)); + result->base.vtable = &handshaker_result_vtable; + if (unused_bytes_size > 0) { + result->unused_bytes = gpr_malloc(unused_bytes_size); + memcpy(result->unused_bytes, unused_bytes, unused_bytes_size); + } + result->unused_bytes_size = unused_bytes_size; + *handshaker_result = &result->base; + return TSI_OK; +} + /* --- tsi_handshaker methods implementation. ---*/ static tsi_result fake_handshaker_get_bytes_to_send_to_peer( @@ -370,13 +439,13 @@ static tsi_result fake_handshaker_get_bytes_to_send_to_peer( *bytes_size = 0; return TSI_OK; } - if (!impl->outgoing.needs_draining) { + if (!impl->outgoing_frame.needs_draining) { tsi_fake_handshake_message next_message_to_send = impl->next_message_to_send + 2; const char *msg_string = tsi_fake_handshake_message_to_string(impl->next_message_to_send); - result = bytes_to_frame((unsigned char *)msg_string, strlen(msg_string), - &impl->outgoing); + result = tsi_fake_frame_set_data((unsigned char *)msg_string, + strlen(msg_string), &impl->outgoing_frame); if (result != TSI_OK) return result; if (next_message_to_send > TSI_FAKE_HANDSHAKE_MESSAGE_MAX) { next_message_to_send = TSI_FAKE_HANDSHAKE_MESSAGE_MAX; @@ -388,7 +457,7 @@ static tsi_result fake_handshaker_get_bytes_to_send_to_peer( } impl->next_message_to_send = next_message_to_send; } - result = drain_frame_to_bytes(bytes, bytes_size, &impl->outgoing); + result = tsi_fake_frame_encode(bytes, bytes_size, &impl->outgoing_frame); if (result != TSI_OK) return result; if (!impl->is_client && impl->next_message_to_send == TSI_FAKE_HANDSHAKE_MESSAGE_MAX) { @@ -414,12 +483,12 @@ static tsi_result fake_handshaker_process_bytes_from_peer( *bytes_size = 0; return TSI_OK; } - result = fill_frame_from_bytes(bytes, bytes_size, &impl->incoming); + result = tsi_fake_frame_decode(bytes, bytes_size, &impl->incoming_frame); if (result != TSI_OK) return result; /* We now have a complete frame. */ result = tsi_fake_handshake_message_from_string( - (const char *)impl->incoming.data + TSI_FAKE_FRAME_HEADER_SIZE, + (const char *)impl->incoming_frame.data + TSI_FAKE_FRAME_HEADER_SIZE, &received_msg); if (result != TSI_OK) { impl->result = result; @@ -434,7 +503,7 @@ static tsi_result fake_handshaker_process_bytes_from_peer( gpr_log(GPR_INFO, "%s received %s.", impl->is_client ? "Client" : "Server", tsi_fake_handshake_message_to_string(received_msg)); } - tsi_fake_frame_reset(&impl->incoming, 0 /* needs_draining */); + tsi_fake_frame_reset(&impl->incoming_frame, 0 /* needs_draining */); impl->needs_incoming_message = 0; if (impl->next_message_to_send == TSI_FAKE_HANDSHAKE_MESSAGE_MAX) { /* We're done. */ @@ -451,40 +520,86 @@ static tsi_result fake_handshaker_get_result(tsi_handshaker *self) { return impl->result; } -static tsi_result fake_handshaker_extract_peer(tsi_handshaker *self, - tsi_peer *peer) { - tsi_result result = tsi_construct_peer(1, peer); - if (result != TSI_OK) return result; - result = tsi_construct_string_peer_property_from_cstring( - TSI_CERTIFICATE_TYPE_PEER_PROPERTY, TSI_FAKE_CERTIFICATE_TYPE, - &peer->properties[0]); - if (result != TSI_OK) tsi_peer_destruct(peer); - return result; -} - -static tsi_result fake_handshaker_create_frame_protector( - tsi_handshaker *self, size_t *max_protected_frame_size, - tsi_frame_protector **protector) { - *protector = tsi_create_fake_protector(max_protected_frame_size); - if (*protector == NULL) return TSI_OUT_OF_RESOURCES; - return TSI_OK; -} - static void fake_handshaker_destroy(tsi_handshaker *self) { tsi_fake_handshaker *impl = (tsi_fake_handshaker *)self; - tsi_fake_frame_destruct(&impl->incoming); - tsi_fake_frame_destruct(&impl->outgoing); + tsi_fake_frame_destruct(&impl->incoming_frame); + tsi_fake_frame_destruct(&impl->outgoing_frame); + gpr_free(impl->outgoing_bytes_buffer); gpr_free(self); } +static tsi_result fake_handshaker_next( + tsi_handshaker *self, const unsigned char *received_bytes, + size_t received_bytes_size, unsigned char **bytes_to_send, + size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result, + tsi_handshaker_on_next_done_cb cb, void *user_data) { + /* Sanity check the arguments. */ + if ((received_bytes_size > 0 && received_bytes == NULL) || + bytes_to_send == NULL || bytes_to_send_size == NULL || + handshaker_result == NULL) { + return TSI_INVALID_ARGUMENT; + } + tsi_fake_handshaker *handshaker = (tsi_fake_handshaker *)self; + tsi_result result = TSI_OK; + + /* Decode and process a handshake frame from the peer. */ + size_t consumed_bytes_size = received_bytes_size; + if (received_bytes_size > 0) { + result = fake_handshaker_process_bytes_from_peer(self, received_bytes, + &consumed_bytes_size); + if (result != TSI_OK) return result; + } + + /* Create a handshake message to send to the peer and encode it as a fake + * frame. */ + size_t offset = 0; + do { + size_t sent_bytes_size = handshaker->outgoing_bytes_buffer_size - offset; + result = fake_handshaker_get_bytes_to_send_to_peer( + self, handshaker->outgoing_bytes_buffer + offset, &sent_bytes_size); + offset += sent_bytes_size; + if (result == TSI_INCOMPLETE_DATA) { + handshaker->outgoing_bytes_buffer_size *= 2; + handshaker->outgoing_bytes_buffer = + gpr_realloc(handshaker->outgoing_bytes_buffer, + handshaker->outgoing_bytes_buffer_size); + } + } while (result == TSI_INCOMPLETE_DATA); + if (result != TSI_OK) return result; + *bytes_to_send = handshaker->outgoing_bytes_buffer; + *bytes_to_send_size = offset; + + /* Check if the handshake was completed. */ + if (fake_handshaker_get_result(self) == TSI_HANDSHAKE_IN_PROGRESS) { + *handshaker_result = NULL; + } else { + /* Calculate the unused bytes. */ + const unsigned char *unused_bytes = NULL; + size_t unused_bytes_size = received_bytes_size - consumed_bytes_size; + if (unused_bytes_size > 0) { + unused_bytes = received_bytes + consumed_bytes_size; + } + + /* Create a handshaker_result containing the unused bytes. */ + result = fake_handshaker_result_create(unused_bytes, unused_bytes_size, + handshaker_result); + if (result == TSI_OK) { + /* Indicate that the handshake has completed and that a handshaker_result + * has been created. */ + self->handshaker_result_created = true; + } + } + return result; +} + static const tsi_handshaker_vtable handshaker_vtable = { - fake_handshaker_get_bytes_to_send_to_peer, - fake_handshaker_process_bytes_from_peer, - fake_handshaker_get_result, - fake_handshaker_extract_peer, - fake_handshaker_create_frame_protector, + NULL, /* get_bytes_to_send_to_peer -- deprecated */ + NULL, /* process_bytes_from_peer -- deprecated */ + NULL, /* get_result -- deprecated */ + NULL, /* extract_peer -- deprecated */ + NULL, /* create_frame_protector -- deprecated */ fake_handshaker_destroy, - NULL, + fake_handshaker_next, }; tsi_handshaker *tsi_create_fake_handshaker(int is_client) { @@ -492,6 +607,9 @@ tsi_handshaker *tsi_create_fake_handshaker(int is_client) { impl->base.vtable = &handshaker_vtable; impl->is_client = is_client; impl->result = TSI_HANDSHAKE_IN_PROGRESS; + impl->outgoing_bytes_buffer_size = + TSI_FAKE_HANDSHAKER_OUTGOING_BUFFER_INITIAL_SIZE; + impl->outgoing_bytes_buffer = gpr_malloc(impl->outgoing_bytes_buffer_size); if (is_client) { impl->needs_incoming_message = 0; impl->next_message_to_send = TSI_FAKE_CLIENT_INIT; @@ -502,7 +620,7 @@ tsi_handshaker *tsi_create_fake_handshaker(int is_client) { return &impl->base; } -tsi_frame_protector *tsi_create_fake_protector( +tsi_frame_protector *tsi_create_fake_frame_protector( size_t *max_protected_frame_size) { tsi_fake_frame_protector *impl = gpr_zalloc(sizeof(*impl)); impl->max_frame_size = (max_protected_frame_size == NULL) diff --git a/src/core/tsi/fake_transport_security.h b/src/core/tsi/fake_transport_security.h index 3d468c477f..934b3cbeb2 100644 --- a/src/core/tsi/fake_transport_security.h +++ b/src/core/tsi/fake_transport_security.h @@ -36,7 +36,7 @@ extern "C" { tsi_handshaker *tsi_create_fake_handshaker(int is_client); /* Creates a protector directly without going through the handshake phase. */ -tsi_frame_protector *tsi_create_fake_protector( +tsi_frame_protector *tsi_create_fake_frame_protector( size_t *max_protected_frame_size); #ifdef __cplusplus diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c index 37d4f038b7..1fd65928f9 100644 --- a/src/core/tsi/ssl_transport_security.c +++ b/src/core/tsi/ssl_transport_security.c @@ -411,15 +411,11 @@ static tsi_result do_ssl_read(SSL *ssl, unsigned char *unprotected_bytes, GPR_ASSERT(*unprotected_bytes_size <= INT_MAX); read_from_ssl = SSL_read(ssl, unprotected_bytes, (int)*unprotected_bytes_size); - if (read_from_ssl == 0) { - gpr_log(GPR_ERROR, "SSL_read returned 0 unexpectedly."); - return TSI_INTERNAL_ERROR; - } - if (read_from_ssl < 0) { + if (read_from_ssl <= 0) { read_from_ssl = SSL_get_error(ssl, read_from_ssl); switch (read_from_ssl) { - case SSL_ERROR_WANT_READ: - /* We need more data to finish the frame. */ + case SSL_ERROR_ZERO_RETURN: /* Received a close_notify alert. */ + case SSL_ERROR_WANT_READ: /* We need more data to finish the frame. */ *unprotected_bytes_size = 0; return TSI_OK; case SSL_ERROR_WANT_WRITE: diff --git a/src/cpp/README.md b/src/cpp/README.md index e9ef489a7c..d2896ad96f 100644 --- a/src/cpp/README.md +++ b/src/cpp/README.md @@ -47,7 +47,7 @@ below. # Build from Source ```sh - $ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc + $ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc $ cd grpc $ git submodule update --init $ make @@ -60,14 +60,14 @@ You can find out how to build and run our simplest gRPC C++ example in our [C++ quick start](../../examples/cpp). For more detailed documentation on using gRPC in C++ , see our main -documentation site at [grpc.io](http://grpc.io), specifically: +documentation site at [grpc.io](https://grpc.io), specifically: -* [Overview](http://www.grpc.io/docs/): An introduction to gRPC with a simple +* [Overview](https://grpc.io/docs/): An introduction to gRPC with a simple Hello World example in all our supported languages, including C++. -* [gRPC Basics - C++](http://www.grpc.io/docs/tutorials/basic/c.html): +* [gRPC Basics - C++](https://grpc.io/docs/tutorials/basic/c.html): A tutorial that steps you through creating a simple gRPC C++ example application. -* [Asynchronous Basics - C++](http://www.grpc.io/docs/tutorials/async/helloasync-cpp.html): +* [Asynchronous Basics - C++](https://grpc.io/docs/tutorials/async/helloasync-cpp.html): A tutorial that shows you how to use gRPC C++'s asynchronous/non-blocking APIs. diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index f2d9bb07c9..038eb32e04 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -76,8 +76,9 @@ grpc::string Channel::GetServiceConfigJSON() const { &channel_info.service_config_json); } -Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, - CompletionQueue* cq) { +internal::Call Channel::CreateCall(const internal::RpcMethod& method, + ClientContext* context, + CompletionQueue* cq) { const bool kRegistered = method.channel_tag() && context->authority().empty(); grpc_call* c_call = NULL; if (kRegistered) { @@ -109,10 +110,11 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, } grpc_census_call_set_context(c_call, context->census_context()); context->set_call(c_call, shared_from_this()); - return Call(c_call, this, cq); + return internal::Call(c_call, this, cq); } -void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { +void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; @@ -131,7 +133,7 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) { } namespace { -class TagSaver final : public CompletionQueueTag { +class TagSaver final : public internal::CompletionQueueTag { public: explicit TagSaver(void* tag) : tag_(tag) {} ~TagSaver() override {} diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index 66b1ef0e39..e65cb9903f 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -27,9 +27,11 @@ std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call( ClientContext* context, const grpc::string& method, CompletionQueue* cq, void* tag) { return std::unique_ptr<GenericClientAsyncReaderWriter>( - GenericClientAsyncReaderWriter::Create( + GenericClientAsyncReaderWriter::internal::Create( channel_.get(), cq, - RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), context, tag)); + internal::RpcMethod(method.c_str(), + internal::RpcMethod::BIDI_STREAMING), + context, tag)); } } // namespace grpc diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index f34b0f3d58..000a03277b 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -60,7 +60,7 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( case GRPC_QUEUE_SHUTDOWN: return SHUTDOWN; case GRPC_OP_COMPLETE: - auto cq_tag = static_cast<CompletionQueueTag*>(ev.tag); + auto cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag); *ok = ev.success != 0; *tag = cq_tag; if (cq_tag->FinalizeResult(tag, ok)) { diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index 815b607032..fc7feb79fe 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -36,11 +36,12 @@ const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check"; DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( DefaultHealthCheckService* service) : service_(service), method_(nullptr) { - MethodHandler* handler = - new RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, ByteBuffer>( + internal::MethodHandler* handler = + new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, + ByteBuffer>( std::mem_fn(&HealthCheckServiceImpl::Check), this); - method_ = new RpcServiceMethod(kHealthCheckMethodName, RpcMethod::NORMAL_RPC, - handler); + method_ = new internal::RpcServiceMethod( + kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler); AddMethod(method_); } diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index 09d5cebe98..99d6680c50 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -41,7 +41,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { private: const DefaultHealthCheckService* const service_; - RpcServiceMethod* method_; + internal::RpcServiceMethod* method_; }; DefaultHealthCheckService(); diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 04abb6fd3e..3bff9999b9 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -86,7 +86,8 @@ class Server::UnimplementedAsyncRequest final ServerCompletionQueue* const cq_; }; -typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> +typedef internal::SneakyCallOpSet<internal::CallOpSendInitialMetadata, + internal::CallOpServerSendStatus> UnimplementedAsyncResponseOp; class Server::UnimplementedAsyncResponse final : public UnimplementedAsyncResponseOp { @@ -104,12 +105,12 @@ class Server::UnimplementedAsyncResponse final UnimplementedAsyncRequest* const request_; }; -class ShutdownTag : public CompletionQueueTag { +class ShutdownTag : public internal::CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { return false; } }; -class DummyTag : public CompletionQueueTag { +class DummyTag : public internal::CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { *status = true; @@ -117,15 +118,15 @@ class DummyTag : public CompletionQueueTag { } }; -class Server::SyncRequest final : public CompletionQueueTag { +class Server::SyncRequest final : public internal::CompletionQueueTag { public: - SyncRequest(RpcServiceMethod* method, void* tag) + SyncRequest(internal::RpcServiceMethod* method, void* tag) : method_(method), tag_(tag), in_flight_(false), - has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || - method->method_type() == - RpcMethod::SERVER_STREAMING), + has_request_payload_( + method->method_type() == internal::RpcMethod::NORMAL_RPC || + method->method_type() == internal::RpcMethod::SERVER_STREAMING), call_details_(nullptr), cq_(nullptr) { grpc_metadata_array_init(&request_metadata_); @@ -202,14 +203,14 @@ class Server::SyncRequest final : public CompletionQueueTag { void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) { ctx_.BeginCompletionOp(&call_); global_callbacks->PreSynchronousRequest(&ctx_); - method_->handler()->RunHandler( - MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_)); + method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter( + &call_, &ctx_, request_payload_)); global_callbacks->PostSynchronousRequest(&ctx_); request_payload_ = nullptr; cq_.Shutdown(); - CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); + internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); /* Ensure the cq_ is shutdown */ @@ -219,15 +220,15 @@ class Server::SyncRequest final : public CompletionQueueTag { private: CompletionQueue cq_; - Call call_; + internal::Call call_; ServerContext ctx_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; - RpcServiceMethod* const method_; + internal::RpcServiceMethod* const method_; }; private: - RpcServiceMethod* const method_; + internal::RpcServiceMethod* const method_; void* const tag_; bool in_flight_; const bool has_request_payload_; @@ -300,14 +301,15 @@ class Server::SyncRequestThreadManager : public ThreadManager { // object } - void AddSyncMethod(RpcServiceMethod* method, void* tag) { + void AddSyncMethod(internal::RpcServiceMethod* method, void* tag) { sync_requests_.emplace_back(new SyncRequest(method, tag)); } void AddUnknownSyncMethod() { if (!sync_requests_.empty()) { - unknown_method_.reset(new RpcServiceMethod( - "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); + unknown_method_.reset(new internal::RpcServiceMethod( + "unknown", internal::RpcMethod::BIDI_STREAMING, + new internal::UnknownMethodHandler)); sync_requests_.emplace_back( new SyncRequest(unknown_method_.get(), nullptr)); } @@ -344,8 +346,8 @@ class Server::SyncRequestThreadManager : public ThreadManager { CompletionQueue* server_cq_; int cq_timeout_msec_; std::vector<std::unique_ptr<SyncRequest>> sync_requests_; - std::unique_ptr<RpcServiceMethod> unknown_method_; - std::unique_ptr<RpcServiceMethod> health_check_; + std::unique_ptr<internal::RpcServiceMethod> unknown_method_; + std::unique_ptr<internal::RpcServiceMethod> health_check_; std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; }; @@ -421,13 +423,13 @@ void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { grpc_server* Server::c_server() { return server_; } static grpc_server_register_method_payload_handling PayloadHandlingForMethod( - RpcServiceMethod* method) { + internal::RpcServiceMethod* method) { switch (method->method_type()) { - case RpcMethod::NORMAL_RPC: - case RpcMethod::SERVER_STREAMING: + case internal::RpcMethod::NORMAL_RPC: + case internal::RpcMethod::SERVER_STREAMING: return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER; - case RpcMethod::CLIENT_STREAMING: - case RpcMethod::BIDI_STREAMING: + case internal::RpcMethod::CLIENT_STREAMING: + case internal::RpcMethod::BIDI_STREAMING: return GRPC_SRM_PAYLOAD_NONE; } GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;); @@ -448,7 +450,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { continue; } - RpcServiceMethod* method = it->get(); + internal::RpcServiceMethod* method = it->get(); void* tag = grpc_server_register_method( server_, method->name(), host ? host->c_str() : nullptr, PayloadHandlingForMethod(method), 0); @@ -588,7 +590,8 @@ void Server::Wait() { } } -void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { +void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; @@ -599,8 +602,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { ServerInterface::BaseAsyncRequest::BaseAsyncRequest( ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, - bool delete_on_finalize) + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + void* tag, bool delete_on_finalize) : server_(server), context_(context), stream_(stream), @@ -622,7 +625,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, } context_->set_call(call_); context_->cq_ = call_cq_; - Call call(call_, server_, call_cq_, server_->max_receive_message_size()); + internal::Call call(call_, server_, call_cq_, + server_->max_receive_message_size()); if (*status && call_) { context_->BeginCompletionOp(&call); } @@ -637,7 +641,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + void* tag) : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} void ServerInterface::RegisteredAsyncRequest::IssueRequest( @@ -651,7 +656,7 @@ void ServerInterface::RegisteredAsyncRequest::IssueRequest( ServerInterface::GenericAsyncRequest::GenericAsyncRequest( ServerInterface* server, GenericServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) : BaseAsyncRequest(server, context, stream, call_cq, tag, delete_on_finalize) { @@ -693,7 +698,7 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( UnimplementedAsyncRequest* request) : request_(request) { Status status(StatusCode::UNIMPLEMENTED, ""); - UnknownMethodHandler::FillOps(request_->context(), this); + internal::UnknownMethodHandler::FillOps(request_->context(), this); request_->stream()->call_.PerformOps(this); } diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 3a6bca13b3..2e55ffbac4 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -37,7 +37,7 @@ namespace grpc { // CompletionOp -class ServerContext::CompletionOp final : public CallOpSetInterface { +class ServerContext::CompletionOp final : public internal::CallOpSetInterface { public: // initial refs: one in the server context, one in the cq CompletionOp() @@ -120,7 +120,8 @@ ServerContext::ServerContext() call_(nullptr), cq_(nullptr), sent_initial_metadata_(false), - compression_level_set_(false) {} + compression_level_set_(false), + has_pending_ops_(false) {} ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) : completion_op_(nullptr), @@ -130,7 +131,8 @@ ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) call_(nullptr), cq_(nullptr), sent_initial_metadata_(false), - compression_level_set_(false) { + compression_level_set_(false), + has_pending_ops_(false) { std::swap(*client_metadata_.arr(), *arr); client_metadata_.FillMap(); } @@ -144,7 +146,7 @@ ServerContext::~ServerContext() { } } -void ServerContext::BeginCompletionOp(Call* call) { +void ServerContext::BeginCompletionOp(internal::Call* call) { GPR_ASSERT(!completion_op_); completion_op_ = new CompletionOp(); if (has_notify_when_done_tag_) { @@ -153,8 +155,8 @@ void ServerContext::BeginCompletionOp(Call* call) { call->PerformOps(completion_op_); } -CompletionQueueTag* ServerContext::GetCompletionOpTag() { - return static_cast<CompletionQueueTag*>(completion_op_); +internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() { + return static_cast<internal::CompletionQueueTag*>(completion_op_); } void ServerContext::AddInitialMetadata(const grpc::string& key, diff --git a/src/csharp/README.md b/src/csharp/README.md index a973d2e597..6821ad225e 100644 --- a/src/csharp/README.md +++ b/src/csharp/README.md @@ -80,6 +80,6 @@ THE NATIVE DEPENDENCY Internally, gRPC C# uses a native library written in C (gRPC C core) and invokes its functionality via P/Invoke. The fact that a native library is used should be fully transparent to the users and just installing the `Grpc.Core` NuGet package is the only step needed to use gRPC C# on all supported platforms. -[API Reference]: http://www.grpc.io/grpc/csharp/ +[API Reference]: https://grpc.io/grpc/csharp/ [Helloworld Example]: ../../examples/csharp/helloworld -[RouteGuide Tutorial]: http://www.grpc.io/docs/tutorials/basic/csharp.html +[RouteGuide Tutorial]: https://grpc.io/docs/tutorials/basic/csharp.html diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index e917917359..71e6904008 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -383,7 +383,10 @@ class ClientStatusOp : public Op { public: ClientStatusOp() { grpc_metadata_array_init(&metadata_array); } - ~ClientStatusOp() { grpc_metadata_array_destroy(&metadata_array); } + ~ClientStatusOp() { + grpc_metadata_array_destroy(&metadata_array); + grpc_slice_unref(status_details); + } bool ParseOp(Local<Value> value, grpc_op *out) { out->data.recv_status_on_client.trailing_metadata = &metadata_array; diff --git a/src/node/src/client.js b/src/node/src/client.js index 6eb5f99cb7..edc51b7802 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -137,6 +137,7 @@ function _write(chunk, encoding, callback) { /* Once a write fails, just call the callback immediately to let the caller flush any pending writes. */ setImmediate(callback); + return; } try { message = this.serialize(chunk); @@ -149,6 +150,7 @@ function _write(chunk, encoding, callback) { this.call.cancelWithStatus(constants.status.INTERNAL, 'Serialization failure'); callback(e); + return; } if (_.isFinite(encoding)) { /* Attach the encoding if it is a finite number. This is the closest we diff --git a/src/node/src/protobuf_js_6_common.js b/src/node/src/protobuf_js_6_common.js index f1e4914fe2..0f07251677 100644 --- a/src/node/src/protobuf_js_6_common.js +++ b/src/node/src/protobuf_js_6_common.js @@ -49,7 +49,7 @@ exports.deserializeCls = function deserializeCls(cls, options) { * @return {cls} The resulting object */ return function deserialize(arg_buf) { - return cls.decode(arg_buf).toObject(conversion_options); + return cls.toObject(cls.decode(arg_buf), conversion_options); }; }; diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index d58d18057e..0b0b393e32 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -1398,13 +1398,25 @@ describe('Client reconnect', function() { }); server.bind('localhost:' + port, server_insecure_creds); server.start(); - client.echo(undefined, function(error, response) { - if (error) { - console.log(error); - } + + /* We create a new client, that will not throw an error if the server + * is not immediately available. Instead, it will wait for the server + * to be available, then the call will complete. Once this happens, the + * original client should be able to make a new call and connect to the + * restarted server without having the call fail due to connection + * errors. */ + var client2 = new Client('localhost:' + port, + grpc.credentials.createInsecure()); + client2.echo({value: 'test', value2: 3}, function(error, response) { assert.ifError(error); - assert.deepEqual(response, {value: '', value2: 0}); - done(); + client.echo(undefined, function(error, response) { + if (error) { + console.log(error); + } + assert.ifError(error); + assert.deepEqual(response, {value: '', value2: 0}); + done(); + }); }); }); }); diff --git a/src/node/tools/package.json b/src/node/tools/package.json index 542d52d48b..0a3c32734c 100644 --- a/src/node/tools/package.json +++ b/src/node/tools/package.json @@ -3,7 +3,7 @@ "version": "1.5.0-dev", "author": "Google Inc.", "description": "Tools for developing with gRPC on Node.js", - "homepage": "http://www.grpc.io/", + "homepage": "https://grpc.io/", "repository": { "type": "git", "url": "https://github.com/grpc/grpc.git" diff --git a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec index 351a45df58..22527d1572 100644 --- a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec +++ b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec @@ -51,7 +51,7 @@ Pod::Spec.new do |s| The generated code will have a dependency on the gRPC Objective-C Proto runtime of the same version. The runtime can be obtained as the "gRPC-ProtoRPC" pod. DESC - s.homepage = 'http://www.grpc.io' + s.homepage = 'https://grpc.io' s.license = { :type => 'Apache License, Version 2.0', :text => <<-LICENSE diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h index bd7d4ad691..a871ea895a 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.h +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h @@ -27,8 +27,8 @@ * immediately, unless flow control prevents it. * If it is throttled and keeps receiving values, as well as if it receives values before being * started, it will buffer them and propagate them in order as soon as its state becomes Started. - * If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and - * propagate the error immediately. + * If it receives an end of stream (via -writesFinishedWithError:), it will buffer the EOS after the + * last buffered value and issue it to the writeable after all buffered values are issued. * * Beware that a pipe of this type can't prevent receiving more values when it is paused (for * example if used to write data to a congested network connection). Because in such situations the diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index e4a7cc40f9..99cb0ad971 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -18,11 +18,13 @@ #import "GRXBufferedPipe.h" +@interface GRXBufferedPipe () +@property(atomic) id<GRXWriteable> writeable; +@end + @implementation GRXBufferedPipe { - id<GRXWriteable> _writeable; - NSMutableArray *_queue; - BOOL _inputIsFinished; NSError *_errorOrNil; + dispatch_queue_t _writeQueue; } @synthesize state = _state; @@ -33,99 +35,79 @@ - (instancetype)init { if (self = [super init]) { - _queue = [NSMutableArray array]; _state = GRXWriterStateNotStarted; + _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); + dispatch_suspend(_writeQueue); } return self; } -- (id)popValue { - id value = _queue[0]; - [_queue removeObjectAtIndex:0]; - return value; -} - -- (void)writeBufferUntilPausedOrStopped { - while (_state == GRXWriterStateStarted && _queue.count > 0) { - [_writeable writeValue:[self popValue]]; - } - if (_inputIsFinished && _queue.count == 0) { - // Our writer finished normally while we were paused or not-started-yet. - [self finishWithError:_errorOrNil]; - } -} - #pragma mark GRXWriteable implementation -// Returns whether events can be simply propagated to the other end of the pipe. -- (BOOL)shouldFastForward { - return _state == GRXWriterStateStarted && _queue.count == 0; -} - - (void)writeValue:(id)value { - if (self.shouldFastForward) { - // Skip the queue. - [_writeable writeValue:value]; - } else { + if ([value respondsToSelector:@selector(copy)]) { // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. // So just buffer the new value. // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe. - if ([value respondsToSelector:@selector(copy)]) { - value = [value copy]; - } - [_queue addObject:value]; + value = [value copy]; } + __weak GRXBufferedPipe *weakSelf = self; + dispatch_async(_writeQueue, ^(void) { + [weakSelf.writeable writeValue:value]; + }); } - (void)writesFinishedWithError:(NSError *)errorOrNil { - _inputIsFinished = YES; - _errorOrNil = errorOrNil; - if (errorOrNil || self.shouldFastForward) { - // No need to write pending values. - [self finishWithError:_errorOrNil]; - } + __weak GRXBufferedPipe *weakSelf = self; + dispatch_async(_writeQueue, ^{ + [weakSelf finishWithError:errorOrNil]; + }); } #pragma mark GRXWriter implementation - (void)setState:(GRXWriterState)newState { - // Manual transitions are only allowed from the started or paused states. - if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { - return; - } - - switch (newState) { - case GRXWriterStateFinished: - _state = newState; - _queue = nil; - // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the - // writeable to be messaged anymore. - _writeable = nil; - return; - case GRXWriterStatePaused: - _state = newState; + @synchronized (self) { + // Manual transitions are only allowed from the started or paused states. + if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { return; - case GRXWriterStateStarted: - if (_state == GRXWriterStatePaused) { + } + + switch (newState) { + case GRXWriterStateFinished: + self.writeable = nil; + if (_state == GRXWriterStatePaused) { + dispatch_resume(_writeQueue); + } _state = newState; - [self writeBufferUntilPausedOrStopped]; - } - return; - case GRXWriterStateNotStarted: - return; + return; + case GRXWriterStatePaused: + if (_state == GRXWriterStateStarted) { + _state = newState; + dispatch_suspend(_writeQueue); + } + return; + case GRXWriterStateStarted: + if (_state == GRXWriterStatePaused) { + _state = newState; + dispatch_resume(_writeQueue); + } + return; + case GRXWriterStateNotStarted: + return; + } } } - (void)startWithWriteable:(id<GRXWriteable>)writeable { + self.writeable = writeable; _state = GRXWriterStateStarted; - _writeable = writeable; - [self writeBufferUntilPausedOrStopped]; + dispatch_resume(_writeQueue); } - (void)finishWithError:(NSError *)errorOrNil { - id<GRXWriteable> writeable = _writeable; + [self.writeable writesFinishedWithError:errorOrNil]; self.state = GRXWriterStateFinished; - [writeable writesFinishedWithError:errorOrNil]; } @end diff --git a/src/objective-c/examples/RemoteTestClient/RemoteTest.podspec b/src/objective-c/examples/RemoteTestClient/RemoteTest.podspec index 22cd7e1c60..cd9464c453 100644 --- a/src/objective-c/examples/RemoteTestClient/RemoteTest.podspec +++ b/src/objective-c/examples/RemoteTestClient/RemoteTest.podspec @@ -3,7 +3,7 @@ Pod::Spec.new do |s| s.version = '0.0.1' s.license = 'Apache License, Version 2.0' s.authors = { 'gRPC contributors' => 'grpc-io@googlegroups.com' } - s.homepage = 'http://www.grpc.io/' + s.homepage = 'https://grpc.io/' s.summary = 'RemoteTest example' s.source = { :git => 'https://github.com/grpc/grpc.git' } diff --git a/src/objective-c/examples/SwiftSample/SwiftSample.xcodeproj/project.pbxproj b/src/objective-c/examples/SwiftSample/SwiftSample.xcodeproj/project.pbxproj index afc3da7116..6247d0b4e6 100644 --- a/src/objective-c/examples/SwiftSample/SwiftSample.xcodeproj/project.pbxproj +++ b/src/objective-c/examples/SwiftSample/SwiftSample.xcodeproj/project.pbxproj @@ -275,6 +275,7 @@ ONLY_ACTIVE_ARCH = YES; SDKROOT = iphoneos; SWIFT_OPTIMIZATION_LEVEL = "-Onone"; + SWIFT_VERSION = 2.3; TARGETED_DEVICE_FAMILY = "1,2"; }; name = Debug; @@ -312,6 +313,7 @@ IPHONEOS_DEPLOYMENT_TARGET = 8.4; MTL_ENABLE_DEBUG_INFO = NO; SDKROOT = iphoneos; + SWIFT_VERSION = 2.3; TARGETED_DEVICE_FAMILY = "1,2"; VALIDATE_PRODUCT = YES; }; @@ -327,6 +329,7 @@ PRODUCT_BUNDLE_IDENTIFIER = "io.grpc.$(PRODUCT_NAME:rfc1034identifier)"; PRODUCT_NAME = "$(TARGET_NAME)"; SWIFT_OBJC_BRIDGING_HEADER = ""; + SWIFT_VERSION = 2.3; USER_HEADER_SEARCH_PATHS = ""; }; name = Debug; @@ -341,6 +344,7 @@ PRODUCT_BUNDLE_IDENTIFIER = "io.grpc.$(PRODUCT_NAME:rfc1034identifier)"; PRODUCT_NAME = "$(TARGET_NAME)"; SWIFT_OBJC_BRIDGING_HEADER = ""; + SWIFT_VERSION = 2.3; USER_HEADER_SEARCH_PATHS = ""; }; name = Release; diff --git a/src/objective-c/tests/RemoteTestClient/RemoteTest.podspec b/src/objective-c/tests/RemoteTestClient/RemoteTest.podspec index 7fbf6371b3..1796c6d746 100644 --- a/src/objective-c/tests/RemoteTestClient/RemoteTest.podspec +++ b/src/objective-c/tests/RemoteTestClient/RemoteTest.podspec @@ -3,7 +3,7 @@ Pod::Spec.new do |s| s.version = "0.0.1" s.license = "Apache License, Version 2.0" s.authors = { 'gRPC contributors' => 'grpc-io@googlegroups.com' } - s.homepage = "http://www.grpc.io/" + s.homepage = "https://grpc.io/" s.summary = "RemoteTest example" s.source = { :git => 'https://github.com/grpc/grpc.git' } diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m index f152452b01..fa3ded4c0c 100644 --- a/src/objective-c/tests/RxLibraryUnitTests.m +++ b/src/objective-c/tests/RxLibraryUnitTests.m @@ -23,6 +23,8 @@ #import <RxLibrary/GRXWriteable.h> #import <RxLibrary/GRXWriter.h> +#define TEST_TIMEOUT 1 + // A mock of a GRXSingleValueHandler block that can be queried for how many times it was called and // what were the last values passed to it. // @@ -140,26 +142,38 @@ #pragma mark BufferedPipe - (void)testBufferedPipePropagatesValue { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"]; // Given: CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler]; - id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block]; + id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) { + handler.block(value, errorOrNil); + [expectation fulfill]; + }]; + id anyValue = @7; // If: GRXBufferedPipe *pipe = [GRXBufferedPipe pipe]; [pipe startWithWriteable:writeable]; [pipe writeValue:anyValue]; + [pipe writesFinishedWithError:nil]; // Then: + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; XCTAssertEqual(handler.timesCalled, 1); XCTAssertEqualObjects(handler.value, anyValue); XCTAssertEqualObjects(handler.errorOrNil, nil); + } - (void)testBufferedPipePropagatesError { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"]; // Given: CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler]; - id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block]; + id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) { + handler.block(value, errorOrNil); + [expectation fulfill]; + }]; NSError *anyError = [NSError errorWithDomain:@"domain" code:7 userInfo:nil]; // If: @@ -168,15 +182,20 @@ [pipe writesFinishedWithError:anyError]; // Then: + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; XCTAssertEqual(handler.timesCalled, 1); XCTAssertEqualObjects(handler.value, nil); XCTAssertEqualObjects(handler.errorOrNil, anyError); } - (void)testBufferedPipeFinishWriteWhilePaused { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"]; // Given: CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler]; - id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block]; + id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) { + handler.block(value, errorOrNil); + [expectation fulfill]; + }]; id anyValue = @7; // If: @@ -188,6 +207,7 @@ [pipe startWithWriteable:writeable]; // Then: + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; XCTAssertEqual(handler.timesCalled, 1); XCTAssertEqualObjects(handler.value, anyValue); XCTAssertEqualObjects(handler.errorOrNil, nil); diff --git a/src/php/README.md b/src/php/README.md index 90c8cb386a..11f99e134c 100644 --- a/src/php/README.md +++ b/src/php/README.md @@ -100,7 +100,7 @@ the `composer` and `protoc` binaries. You can find out how to get these Clone this repository ```sh -$ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc +$ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc ``` Build and install the gRPC C core library @@ -129,7 +129,7 @@ $ sudo make install You will need the source code to run tests ```sh -$ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc +$ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc $ cd grpc $ git pull --recurse-submodules && git submodule update --init --recursive ``` diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto index 8cc3f55ac1..48016642f7 100644 --- a/src/proto/grpc/testing/control.proto +++ b/src/proto/grpc/testing/control.proto @@ -152,7 +152,6 @@ message ServerConfig { // Buffer pool size (no buffer pool specified if unset) int32 resource_quota_size = 1001; - repeated ChannelArg channel_args = 1002; } diff --git a/src/python/grpcio/README.rst b/src/python/grpcio/README.rst index 28a2714568..f047243f82 100644 --- a/src/python/grpcio/README.rst +++ b/src/python/grpcio/README.rst @@ -46,7 +46,7 @@ package named :code:`python-dev`). :: $ export REPO_ROOT=grpc # REPO_ROOT can be any directory of your choice - $ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc $REPO_ROOT + $ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc $REPO_ROOT $ cd $REPO_ROOT $ git submodule update --init diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index a8c69720fd..5950bfa0e6 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -518,7 +518,6 @@ cdef extern from "grpc/compression.h": ctypedef struct grpc_compression_options: uint32_t enabled_algorithms_bitset - grpc_compression_algorithm default_compression_algorithm int grpc_compression_algorithm_parse( grpc_slice value, grpc_compression_algorithm *algorithm) nogil diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 5819a624f7..ea5bdbae58 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -39,6 +39,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/support/log_windows.c', 'src/core/lib/support/mpscq.c', 'src/core/lib/support/murmur_hash.c', + 'src/core/lib/support/stack_lockfree.c', 'src/core/lib/support/string.c', 'src/core/lib/support/string_posix.c', 'src/core/lib/support/string_util_windows.c', diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py index fc02a24c87..83e0ead391 100644 --- a/src/python/grpcio_health_checking/setup.py +++ b/src/python/grpcio_health_checking/setup.py @@ -46,7 +46,7 @@ setuptools.setup( description='Standard Health Checking Service for gRPC', author='The gRPC Authors', author_email='grpc-io@googlegroups.com', - url='http://www.grpc.io', + url='https://grpc.io', license='Apache License 2.0', package_dir=PACKAGE_DIRECTORIES, packages=setuptools.find_packages('.'), diff --git a/src/python/grpcio_reflection/setup.py b/src/python/grpcio_reflection/setup.py index 920f3b2b48..20edbc4ec0 100644 --- a/src/python/grpcio_reflection/setup.py +++ b/src/python/grpcio_reflection/setup.py @@ -48,7 +48,7 @@ setuptools.setup( description='Standard Protobuf Reflection Service for gRPC', author='The gRPC Authors', author_email='grpc-io@googlegroups.com', - url='http://www.grpc.io', + url='https://grpc.io', package_dir=PACKAGE_DIRECTORIES, packages=setuptools.find_packages('.'), install_requires=INSTALL_REQUIRES, diff --git a/src/ruby/README.md b/src/ruby/README.md index 266d6b2c16..5c7dae654a 100644 --- a/src/ruby/README.md +++ b/src/ruby/README.md @@ -68,5 +68,5 @@ Directory structure is the layout for [ruby extensions][] [ruby extensions]:http://guides.rubygems.org/gems-with-extensions/ [rubydoc]: http://www.rubydoc.info/gems/grpc -[grpc.io]: http://www.grpc.io/docs/quickstart/ruby.html +[grpc.io]: https://grpc.io/docs/quickstart/ruby.html [Debian jessie-backports]:http://backports.debian.org/Instructions/ |