diff options
Diffstat (limited to 'src')
35 files changed, 1273 insertions, 286 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 56716493dc..7986aca696 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -135,6 +135,7 @@ grpc::string GetHeaderIncludes(grpc_generator::File* file, "grpcpp/impl/codegen/method_handler_impl.h", "grpcpp/impl/codegen/proto_utils.h", "grpcpp/impl/codegen/rpc_method.h", + "grpcpp/impl/codegen/server_callback.h", "grpcpp/impl/codegen/service_type.h", "grpcpp/impl/codegen/status.h", "grpcpp/impl/codegen/stub_options.h", @@ -700,9 +701,9 @@ void PrintHeaderServerMethodSync(grpc_generator::Printer* printer, printer->Print(method->GetTrailingComments("//").c_str()); } -// Helper generator. Disabled the sync API for Request and Response, then adds +// Helper generator. Disables the sync API for Request and Response, then adds // in an async API for RealRequest and RealResponse types. This is to be used -// to generate async and raw APIs. +// to generate async and raw async APIs. void PrintHeaderServerAsyncMethodsHelper( grpc_generator::Printer* printer, const grpc_generator::Method* method, std::map<grpc::string, grpc::string>* vars) { @@ -829,6 +830,170 @@ void PrintHeaderServerMethodAsync(grpc_generator::Printer* printer, printer->Print(*vars, "};\n"); } +// Helper generator. Disables the sync API for Request and Response, then adds +// in a callback API for RealRequest and RealResponse types. This is to be used +// to generate callback and raw callback APIs. +void PrintHeaderServerCallbackMethodsHelper( + grpc_generator::Printer* printer, const grpc_generator::Method* method, + std::map<grpc::string, grpc::string>* vars) { + if (method->NoStreaming()) { + printer->Print( + *vars, + "// disable synchronous version of this method\n" + "::grpc::Status $Method$(" + "::grpc::ServerContext* context, const $Request$* request, " + "$Response$* response) override {\n" + " abort();\n" + " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" + "}\n"); + printer->Print( + *vars, + "virtual void $Method$(" + "::grpc::ServerContext* context, const $RealRequest$* request, " + "$RealResponse$* response, " + "::grpc::experimental::ServerCallbackRpcController* " + "controller) { controller->Finish(::grpc::Status(" + "::grpc::StatusCode::UNIMPLEMENTED, \"\")); }\n"); + } else if (ClientOnlyStreaming(method)) { + printer->Print( + *vars, + "// disable synchronous version of this method\n" + "::grpc::Status $Method$(" + "::grpc::ServerContext* context, " + "::grpc::ServerReader< $Request$>* reader, " + "$Response$* response) override {\n" + " abort();\n" + " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" + "}\n"); + } else if (ServerOnlyStreaming(method)) { + printer->Print( + *vars, + "// disable synchronous version of this method\n" + "::grpc::Status $Method$(" + "::grpc::ServerContext* context, const $Request$* request, " + "::grpc::ServerWriter< $Response$>* writer) override " + "{\n" + " abort();\n" + " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" + "}\n"); + } else if (method->BidiStreaming()) { + printer->Print( + *vars, + "// disable synchronous version of this method\n" + "::grpc::Status $Method$(" + "::grpc::ServerContext* context, " + "::grpc::ServerReaderWriter< $Response$, $Request$>* stream) " + " override {\n" + " abort();\n" + " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" + "}\n"); + } +} + +void PrintHeaderServerMethodCallback( + grpc_generator::Printer* printer, const grpc_generator::Method* method, + std::map<grpc::string, grpc::string>* vars) { + (*vars)["Method"] = method->name(); + // These will be disabled + (*vars)["Request"] = method->input_type_name(); + (*vars)["Response"] = method->output_type_name(); + // These will be used for the callback API + (*vars)["RealRequest"] = method->input_type_name(); + (*vars)["RealResponse"] = method->output_type_name(); + printer->Print(*vars, "template <class BaseClass>\n"); + printer->Print( + *vars, + "class ExperimentalWithCallbackMethod_$Method$ : public BaseClass {\n"); + printer->Print( + " private:\n" + " void BaseClassMustBeDerivedFromService(const Service *service) {}\n"); + printer->Print(" public:\n"); + printer->Indent(); + printer->Print(*vars, "ExperimentalWithCallbackMethod_$Method$() {\n"); + if (method->NoStreaming()) { + printer->Print( + *vars, + " ::grpc::Service::experimental().MarkMethodCallback($Idx$,\n" + " new ::grpc::internal::CallbackUnaryHandler< " + "ExperimentalWithCallbackMethod_$Method$<BaseClass>, $RealRequest$, " + "$RealResponse$>(\n" + " [this](::grpc::ServerContext* context,\n" + " const $RealRequest$* request,\n" + " $RealResponse$* response,\n" + " ::grpc::experimental::ServerCallbackRpcController* " + "controller) {\n" + " this->$" + "Method$(context, request, response, controller);\n" + " }, this));\n"); + } else if (ClientOnlyStreaming(method)) { + // TODO(vjpai): Add in code generation for all streaming methods + } else if (ServerOnlyStreaming(method)) { + // TODO(vjpai): Add in code generation for all streaming methods + } else if (method->BidiStreaming()) { + // TODO(vjpai): Add in code generation for all streaming methods + } + printer->Print(*vars, "}\n"); + printer->Print(*vars, + "~ExperimentalWithCallbackMethod_$Method$() override {\n" + " BaseClassMustBeDerivedFromService(this);\n" + "}\n"); + PrintHeaderServerCallbackMethodsHelper(printer, method, vars); + printer->Outdent(); + printer->Print(*vars, "};\n"); +} + +void PrintHeaderServerMethodRawCallback( + grpc_generator::Printer* printer, const grpc_generator::Method* method, + std::map<grpc::string, grpc::string>* vars) { + (*vars)["Method"] = method->name(); + // These will be disabled + (*vars)["Request"] = method->input_type_name(); + (*vars)["Response"] = method->output_type_name(); + // These will be used for raw API + (*vars)["RealRequest"] = "::grpc::ByteBuffer"; + (*vars)["RealResponse"] = "::grpc::ByteBuffer"; + printer->Print(*vars, "template <class BaseClass>\n"); + printer->Print(*vars, + "class ExperimentalWithRawCallbackMethod_$Method$ : public " + "BaseClass {\n"); + printer->Print( + " private:\n" + " void BaseClassMustBeDerivedFromService(const Service *service) {}\n"); + printer->Print(" public:\n"); + printer->Indent(); + printer->Print(*vars, "ExperimentalWithRawCallbackMethod_$Method$() {\n"); + if (method->NoStreaming()) { + printer->Print( + *vars, + " ::grpc::Service::experimental().MarkMethodRawCallback($Idx$,\n" + " new ::grpc::internal::CallbackUnaryHandler< " + "ExperimentalWithRawCallbackMethod_$Method$<BaseClass>, $RealRequest$, " + "$RealResponse$>(\n" + " [this](::grpc::ServerContext* context,\n" + " const $RealRequest$* request,\n" + " $RealResponse$* response,\n" + " ::grpc::experimental::ServerCallbackRpcController* " + "controller) {\n" + " this->$" + "Method$(context, request, response, controller);\n" + " }, this));\n"); + } else if (ClientOnlyStreaming(method)) { + // TODO(vjpai): Add in code generation for all streaming methods + } else if (ServerOnlyStreaming(method)) { + // TODO(vjpai): Add in code generation for all streaming methods + } else if (method->BidiStreaming()) { + // TODO(vjpai): Add in code generation for all streaming methods + } + printer->Print(*vars, "}\n"); + printer->Print(*vars, + "~ExperimentalWithRawCallbackMethod_$Method$() override {\n" + " BaseClassMustBeDerivedFromService(this);\n" + "}\n"); + PrintHeaderServerCallbackMethodsHelper(printer, method, vars); + printer->Outdent(); + printer->Print(*vars, "};\n"); +} + void PrintHeaderServerMethodStreamedUnary( grpc_generator::Printer* printer, const grpc_generator::Method* method, std::map<grpc::string, grpc::string>* vars) { @@ -1137,7 +1302,7 @@ void PrintHeaderService(grpc_generator::Printer* printer, printer->Print("typedef "); for (int i = 0; i < service->method_count(); ++i) { - (*vars)["method_name"] = service->method(i).get()->name(); + (*vars)["method_name"] = service->method(i)->name(); printer->Print(*vars, "WithAsyncMethod_$method_name$<"); } printer->Print("Service"); @@ -1146,6 +1311,24 @@ void PrintHeaderService(grpc_generator::Printer* printer, } printer->Print(" AsyncService;\n"); + // Server side - Callback + for (int i = 0; i < service->method_count(); ++i) { + (*vars)["Idx"] = as_string(i); + PrintHeaderServerMethodCallback(printer, service->method(i).get(), vars); + } + + printer->Print("typedef "); + + for (int i = 0; i < service->method_count(); ++i) { + (*vars)["method_name"] = service->method(i)->name(); + printer->Print(*vars, "ExperimentalWithCallbackMethod_$method_name$<"); + } + printer->Print("Service"); + for (int i = 0; i < service->method_count(); ++i) { + printer->Print(" >"); + } + printer->Print(" ExperimentalCallbackService;\n"); + // Server side - Generic for (int i = 0; i < service->method_count(); ++i) { (*vars)["Idx"] = as_string(i); @@ -1158,6 +1341,12 @@ void PrintHeaderService(grpc_generator::Printer* printer, PrintHeaderServerMethodRaw(printer, service->method(i).get(), vars); } + // Server side - Raw Callback + for (int i = 0; i < service->method_count(); ++i) { + (*vars)["Idx"] = as_string(i); + PrintHeaderServerMethodRawCallback(printer, service->method(i).get(), vars); + } + // Server side - Streamed Unary for (int i = 0; i < service->method_count(); ++i) { (*vars)["Idx"] = as_string(i); @@ -1167,7 +1356,7 @@ void PrintHeaderService(grpc_generator::Printer* printer, printer->Print("typedef "); for (int i = 0; i < service->method_count(); ++i) { - (*vars)["method_name"] = service->method(i).get()->name(); + (*vars)["method_name"] = service->method(i)->name(); if (service->method(i)->NoStreaming()) { printer->Print(*vars, "WithStreamedUnaryMethod_$method_name$<"); } @@ -1189,7 +1378,7 @@ void PrintHeaderService(grpc_generator::Printer* printer, printer->Print("typedef "); for (int i = 0; i < service->method_count(); ++i) { - (*vars)["method_name"] = service->method(i).get()->name(); + (*vars)["method_name"] = service->method(i)->name(); auto method = service->method(i); if (ServerOnlyStreaming(method.get())) { printer->Print(*vars, "WithSplitStreamingMethod_$method_name$<"); @@ -1207,7 +1396,7 @@ void PrintHeaderService(grpc_generator::Printer* printer, // Server side - typedef for controlled both unary and server-side streaming printer->Print("typedef "); for (int i = 0; i < service->method_count(); ++i) { - (*vars)["method_name"] = service->method(i).get()->name(); + (*vars)["method_name"] = service->method(i)->name(); auto method = service->method(i); if (ServerOnlyStreaming(method.get())) { printer->Print(*vars, "WithSplitStreamingMethod_$method_name$<"); @@ -1333,6 +1522,7 @@ grpc::string GetSourceIncludes(grpc_generator::File* file, "grpcpp/impl/codegen/client_callback.h", "grpcpp/impl/codegen/method_handler_impl.h", "grpcpp/impl/codegen/rpc_service_method.h", + "grpcpp/impl/codegen/server_callback.h", "grpcpp/impl/codegen/service_type.h", "grpcpp/impl/codegen/sync_stream.h"}; std::vector<grpc::string> headers(headers_strs, array_end(headers_strs)); @@ -1577,7 +1767,7 @@ void PrintSourceService(grpc_generator::Printer* printer, printer->Print(*vars, "static const char* $prefix$$Service$_method_names[] = {\n"); for (int i = 0; i < service->method_count(); ++i) { - (*vars)["Method"] = service->method(i).get()->name(); + (*vars)["Method"] = service->method(i)->name(); printer->Print(*vars, " \"/$Package$$Service$/$Method$\",\n"); } printer->Print(*vars, "};\n\n"); diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index daf1b89b09..91894689c3 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -2951,6 +2951,27 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) { } } +// If the channel is in TRANSIENT_FAILURE and the call is not +// wait_for_ready=true, fails the call and returns true. +static bool fail_call_if_in_transient_failure(grpc_call_element* elem) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch; + if (grpc_connectivity_state_check(&chand->state_tracker) == + GRPC_CHANNEL_TRANSIENT_FAILURE && + (batch->payload->send_initial_metadata.send_initial_metadata_flags & + GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) { + pending_batches_fail( + elem, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "channel is in state TRANSIENT_FAILURE"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), + true /* yield_call_combiner */); + return true; + } + return false; +} + // Invoked once resolver results are available. static void process_service_config_and_start_lb_pick_locked( grpc_call_element* elem) { @@ -2958,6 +2979,9 @@ static void process_service_config_and_start_lb_pick_locked( // Only get service config data on the first attempt. if (GPR_LIKELY(calld->num_attempts_completed == 0)) { apply_service_config_to_call_locked(elem); + // Check this after applying service config, since it may have + // affected the call's wait_for_ready value. + if (fail_call_if_in_transient_failure(elem)) return; } // Start LB pick. grpc_core::LbPicker::StartLocked(elem); @@ -3127,6 +3151,16 @@ static void start_pick_locked(void* arg, grpc_error* ignored) { // We do not yet have an LB policy, so wait for a resolver result. if (GPR_UNLIKELY(!chand->started_resolving)) { start_resolving_locked(chand); + } else { + // Normally, we want to do this check in + // process_service_config_and_start_lb_pick_locked(), so that we + // can honor the wait_for_ready setting in the service config. + // However, if the channel is in TRANSIENT_FAILURE at this point, that + // means that the resolver has returned a failure, so we're not going + // to get a service config right away. In that case, we fail the + // call now based on the wait_for_ready value passed in from the + // application. + if (fail_call_if_in_transient_failure(elem)) return; } // Create a new waiter, which will delete itself when done. grpc_core::New<grpc_core::ResolverResultWaiter>(elem); diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc index 400d99b07c..591637aa86 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.cc +++ b/src/core/ext/filters/client_channel/health/health_check_client.cc @@ -19,6 +19,7 @@ #include <grpc/support/port_platform.h> #include <stdint.h> +#include <stdio.h> #include "src/core/ext/filters/client_channel/health/health_check_client.h" @@ -298,6 +299,11 @@ HealthCheckClient::CallState::~CallState() { health_check_client_.get(), this); } if (call_ != nullptr) GRPC_SUBCHANNEL_CALL_UNREF(call_, "call_ended"); + for (size_t i = 0; i < GRPC_CONTEXT_COUNT; i++) { + if (context_[i].destroy != nullptr) { + context_[i].destroy(context_[i].value); + } + } // Unset the call combiner cancellation closure. This has the // effect of scheduling the previously set cancellation closure, if // any, so that it can release any internal references it may be @@ -345,6 +351,7 @@ void HealthCheckClient::CallState::StartCall() { } // Initialize payload and batch. memset(&batch_, 0, sizeof(batch_)); + payload_.context = context_; batch_.payload = &payload_; // on_complete callback takes ref, handled manually. Ref(DEBUG_LOCATION, "on_complete").release(); diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc index 1e9c5ee4d7..60a32022f5 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -212,9 +212,17 @@ static void chttp2_connector_connect(grpc_connector* con, GRPC_CLOSURE_INIT(&c->connected, connected, c, grpc_schedule_on_exec_ctx); GPR_ASSERT(!c->connecting); c->connecting = true; - grpc_tcp_client_connect(&c->connected, &c->endpoint, args->interested_parties, - args->channel_args, &addr, args->deadline); + grpc_closure* closure = &c->connected; + grpc_endpoint** ep = &c->endpoint; gpr_mu_unlock(&c->mu); + // In some implementations, the closure can be flushed before + // grpc_tcp_client_connect and since the closure requires access to c->mu, + // this can result in a deadlock. Refer + // https://github.com/grpc/grpc/issues/16427 + // grpc_tcp_client_connect would fill c->endpoint with proper contents and we + // make sure that we would still exist at that point by taking a ref. + grpc_tcp_client_connect(closure, ep, args->interested_parties, + args->channel_args, &addr, args->deadline); } static const grpc_connector_vtable chttp2_connector_vtable = { diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index 07b304b320..287bc0454e 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -37,8 +37,10 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" #include "src/core/lib/channel/handshaker_registry.h" +#include "src/core/lib/gpr/host_port.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/api_trace.h" @@ -114,9 +116,16 @@ static void on_handshake_done(void* arg, grpc_error* error) { server_connection_state* connection_state = static_cast<server_connection_state*>(args->user_data); gpr_mu_lock(&connection_state->svr_state->mu); + grpc_resource_user* resource_user = grpc_server_get_default_resource_user( + connection_state->svr_state->server); if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) { const char* error_str = grpc_error_string(error); gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str); + grpc_resource_user* resource_user = grpc_server_get_default_resource_user( + connection_state->svr_state->server); + if (resource_user != nullptr) { + grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); + } if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) { // We were shut down after handshaking completed successfully, so // destroy the endpoint here. @@ -135,12 +144,12 @@ static void on_handshake_done(void* arg, grpc_error* error) { // handshaker may have handed off the connection to some external // code, so we can just clean up here without creating a transport. if (args->endpoint != nullptr) { - grpc_transport* transport = - grpc_create_chttp2_transport(args->args, args->endpoint, false); + grpc_transport* transport = grpc_create_chttp2_transport( + args->args, args->endpoint, false, resource_user); grpc_server_setup_transport( connection_state->svr_state->server, transport, connection_state->accepting_pollset, args->args, - grpc_chttp2_transport_get_socket_uuid(transport)); + grpc_chttp2_transport_get_socket_uuid(transport), resource_user); // Use notify_on_receive_settings callback to enforce the // handshake deadline. connection_state->transport = @@ -159,6 +168,11 @@ static void on_handshake_done(void* arg, grpc_error* error) { connection_state, grpc_schedule_on_exec_ctx); grpc_timer_init(&connection_state->timer, connection_state->deadline, &connection_state->on_timeout); + } else { + if (resource_user != nullptr) { + grpc_resource_user_free(resource_user, + GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); + } } } grpc_handshake_manager_pending_list_remove( @@ -183,6 +197,20 @@ static void on_accept(void* arg, grpc_endpoint* tcp, gpr_free(acceptor); return; } + grpc_resource_user* resource_user = + grpc_server_get_default_resource_user(state->server); + if (resource_user != nullptr && + !grpc_resource_user_safe_alloc(resource_user, + GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) { + gpr_log( + GPR_ERROR, + "Memory quota exhausted, rejecting the connection, no handshaking."); + gpr_mu_unlock(&state->mu); + grpc_endpoint_shutdown(tcp, GRPC_ERROR_NONE); + grpc_endpoint_destroy(tcp); + gpr_free(acceptor); + return; + } grpc_handshake_manager* handshake_mgr = grpc_handshake_manager_create(); grpc_handshake_manager_pending_list_add(&state->pending_handshake_mgrs, handshake_mgr); @@ -339,8 +367,14 @@ grpc_error* grpc_chttp2_server_add_port(grpc_server* server, const char* addr, arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ); if (grpc_channel_arg_get_bool(arg, false)) { + char* host; + char* port; + gpr_split_host_port(addr, &host, &port); + // allocated host's ownership is passed to ListenSocketNode. state->channelz_listen_socket = - grpc_core::MakeRefCounted<grpc_core::channelz::ListenSocketNode>(); + grpc_core::MakeRefCounted<grpc_core::channelz::ListenSocketNode>( + grpc_core::UniquePtr<char>(host), *port_num); + gpr_free(port); socket_uuid = state->channelz_listen_socket->uuid(); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 8a481bb7d5..04874a9494 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -478,7 +478,8 @@ static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) { static void init_transport(grpc_chttp2_transport* t, const grpc_channel_args* channel_args, - grpc_endpoint* ep, bool is_client) { + grpc_endpoint* ep, bool is_client, + grpc_resource_user* resource_user) { GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); @@ -491,6 +492,7 @@ static void init_transport(grpc_chttp2_transport* t, t->endpoint_reading = 1; t->next_stream_id = is_client ? 1 : 2; t->is_client = is_client; + t->resource_user = resource_user; t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->is_first_frame = true; grpc_connectivity_state_init( @@ -778,6 +780,10 @@ static void destroy_stream_locked(void* sp, grpc_error* error) { s->flow_control.Destroy(); + if (t->resource_user != nullptr) { + grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE); + } + GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream"); GRPC_CLOSURE_SCHED(s->destroy_stream_arg, GRPC_ERROR_NONE); @@ -816,7 +822,21 @@ grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t, if (t->channel_callback.accept_stream == nullptr) { return nullptr; } - grpc_chttp2_stream* accepting; + // Don't accept the stream if memory quota doesn't allow. Note that we should + // simply refuse the stream here instead of canceling the stream after it's + // accepted since the latter will create the call which costs much memory. + if (t->resource_user != nullptr && + !grpc_resource_user_safe_alloc(t->resource_user, + GRPC_RESOURCE_QUOTA_CALL_SIZE)) { + gpr_log(GPR_ERROR, "Memory exhausted, rejecting the stream."); + grpc_slice_buffer_add( + &t->qbuf, + grpc_chttp2_rst_stream_create( + id, static_cast<uint32_t>(GRPC_HTTP2_REFUSED_STREAM), nullptr)); + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM); + return nullptr; + } + grpc_chttp2_stream* accepting = nullptr; GPR_ASSERT(t->accepting_stream == nullptr); t->accepting_stream = &accepting; t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data, @@ -3185,10 +3205,11 @@ intptr_t grpc_chttp2_transport_get_socket_uuid(grpc_transport* transport) { } grpc_transport* grpc_create_chttp2_transport( - const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client) { + const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client, + grpc_resource_user* resource_user) { grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>( gpr_zalloc(sizeof(grpc_chttp2_transport))); - init_transport(t, channel_args, ep, is_client); + init_transport(t, channel_args, ep, is_client, resource_user); return &t->base; } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.h b/src/core/ext/transport/chttp2/transport/chttp2_transport.h index e5872fee43..b3fe1c082e 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.h +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.h @@ -32,7 +32,8 @@ extern grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount; extern bool g_flow_control_enabled; grpc_transport* grpc_create_chttp2_transport( - const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client); + const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client, + grpc_resource_user* resource_user = nullptr); intptr_t grpc_chttp2_transport_get_socket_uuid(grpc_transport* transport); diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc b/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc index 4bdd4309a4..a0a7534594 100644 --- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc @@ -32,7 +32,7 @@ grpc_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code, grpc_transport_one_way_stats* stats) { static const size_t frame_size = 13; grpc_slice slice = GRPC_SLICE_MALLOC(frame_size); - stats->framing_bytes += frame_size; + if (stats != nullptr) stats->framing_bytes += frame_size; uint8_t* p = GRPC_SLICE_START_PTR(slice); // Frame size. diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index ff26dd9255..202017641b 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -285,6 +285,8 @@ struct grpc_chttp2_transport { grpc_endpoint* ep; char* peer_string; + grpc_resource_user* resource_user; + grpc_combiner* combiner; grpc_closure* notify_on_receive_settings; diff --git a/src/core/lib/channel/channel_stack_builder.cc b/src/core/lib/channel/channel_stack_builder.cc index df5a783631..8b3008f221 100644 --- a/src/core/lib/channel/channel_stack_builder.cc +++ b/src/core/lib/channel/channel_stack_builder.cc @@ -40,6 +40,7 @@ struct grpc_channel_stack_builder { // various set/get-able parameters grpc_channel_args* args; grpc_transport* transport; + grpc_resource_user* resource_user; char* target; const char* name; }; @@ -157,6 +158,11 @@ void grpc_channel_stack_builder_set_channel_arguments( builder->args = grpc_channel_args_copy(args); } +const grpc_channel_args* grpc_channel_stack_builder_get_channel_arguments( + grpc_channel_stack_builder* builder) { + return builder->args; +} + void grpc_channel_stack_builder_set_transport( grpc_channel_stack_builder* builder, grpc_transport* transport) { GPR_ASSERT(builder->transport == nullptr); @@ -168,9 +174,15 @@ grpc_transport* grpc_channel_stack_builder_get_transport( return builder->transport; } -const grpc_channel_args* grpc_channel_stack_builder_get_channel_arguments( +void grpc_channel_stack_builder_set_resource_user( + grpc_channel_stack_builder* builder, grpc_resource_user* resource_user) { + GPR_ASSERT(builder->resource_user == nullptr); + builder->resource_user = resource_user; +} + +grpc_resource_user* grpc_channel_stack_builder_get_resource_user( grpc_channel_stack_builder* builder) { - return builder->args; + return builder->resource_user; } bool grpc_channel_stack_builder_append_filter( diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h index 9196de9378..89c30e0c5e 100644 --- a/src/core/lib/channel/channel_stack_builder.h +++ b/src/core/lib/channel/channel_stack_builder.h @@ -54,6 +54,14 @@ void grpc_channel_stack_builder_set_transport( grpc_transport* grpc_channel_stack_builder_get_transport( grpc_channel_stack_builder* builder); +/// Attach \a resource_user to the builder (does not take ownership) +void grpc_channel_stack_builder_set_resource_user( + grpc_channel_stack_builder* builder, grpc_resource_user* resource_user); + +/// Fetch attached resource user +grpc_resource_user* grpc_channel_stack_builder_get_resource_user( + grpc_channel_stack_builder* builder); + /// Set channel arguments: copies args void grpc_channel_stack_builder_set_channel_arguments( grpc_channel_stack_builder* builder, const grpc_channel_args* args); diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc index 33577d890a..032654b861 100644 --- a/src/core/lib/channel/channelz.cc +++ b/src/core/lib/channel/channelz.cc @@ -374,7 +374,8 @@ grpc_json* SocketNode::RenderJson() { return top_level_json; } -ListenSocketNode::ListenSocketNode() : BaseNode(EntityType::kSocket) {} +ListenSocketNode::ListenSocketNode(UniquePtr<char> host, int port) + : BaseNode(EntityType::kSocket), host_(std::move(host)), port_(port) {} grpc_json* ListenSocketNode::RenderJson() { // We need to track these three json objects to build our object @@ -388,6 +389,21 @@ grpc_json* ListenSocketNode::RenderJson() { json_iterator = nullptr; json_iterator = grpc_json_add_number_string_child(json, json_iterator, "socketId", uuid()); + json = top_level_json; + json_iterator = nullptr; + json_iterator = grpc_json_create_child(json_iterator, json, "local", nullptr, + GRPC_JSON_OBJECT, false); + json = json_iterator; + json_iterator = nullptr; + json_iterator = grpc_json_create_child(json_iterator, json, "tcpip_address", + nullptr, GRPC_JSON_OBJECT, false); + json = json_iterator; + json_iterator = nullptr; + json_iterator = + grpc_json_add_number_string_child(json, json_iterator, "port", port_); + json_iterator = grpc_json_create_child(json_iterator, json, "ip_address", + host_.get(), GRPC_JSON_STRING, false); + return top_level_json; } diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h index 88551befc8..8e66623142 100644 --- a/src/core/lib/channel/channelz.h +++ b/src/core/lib/channel/channelz.h @@ -268,10 +268,15 @@ class SocketNode : public BaseNode { // Handles channelz bookkeeping for listen sockets class ListenSocketNode : public BaseNode { public: - ListenSocketNode(); + // ListenSocketNode takes ownership of host. + ListenSocketNode(UniquePtr<char> host, int port); ~ListenSocketNode() override {} grpc_json* RenderJson() override; + + private: + UniquePtr<char> host_; + int port_; }; // Creation functions diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc index 1fe2fad3e1..8fc1b58adc 100644 --- a/src/core/lib/channel/channelz_registry.cc +++ b/src/core/lib/channel/channelz_registry.cc @@ -222,6 +222,24 @@ char* grpc_channelz_get_servers(intptr_t start_server_id) { return grpc_core::channelz::ChannelzRegistry::GetServers(start_server_id); } +char* grpc_channelz_get_server(intptr_t server_id) { + grpc_core::channelz::BaseNode* server_node = + grpc_core::channelz::ChannelzRegistry::Get(server_id); + if (server_node == nullptr || + server_node->type() != + grpc_core::channelz::BaseNode::EntityType::kServer) { + return nullptr; + } + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* channel_json = server_node->RenderJson(); + channel_json->key = "server"; + grpc_json_link_child(json, channel_json, nullptr); + char* json_str = grpc_json_dump_to_string(top_level_json, 0); + grpc_json_destroy(top_level_json); + return json_str; +} + char* grpc_channelz_get_server_sockets(intptr_t server_id, intptr_t start_socket_id) { grpc_core::channelz::BaseNode* base_node = diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index b6fc7579f7..7e4b3c9b2f 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -90,7 +90,8 @@ struct grpc_resource_user { grpc_closure_list on_allocated; /* True if we are currently trying to allocate from the quota, false if not */ bool allocating; - /* How many bytes of allocations are outstanding */ + /* The amount of memory (in bytes) that has been requested from this user + * asynchronously but hasn't been granted yet. */ int64_t outstanding_allocations; /* True if we are currently trying to add ourselves to the non-free quota list, false otherwise */ @@ -135,6 +136,9 @@ struct grpc_resource_quota { int64_t size; /* Amount of free memory in the resource quota */ int64_t free_pool; + /* Used size of memory in the resource quota. Updated as soon as the resource + * users start to allocate or free the memory. */ + gpr_atm used; gpr_atm last_size; @@ -371,6 +375,7 @@ static bool rq_reclaim_from_per_user_free_pool( while ((resource_user = rulist_pop_head(resource_quota, GRPC_RULIST_NON_EMPTY_FREE_POOL))) { gpr_mu_lock(&resource_user->mu); + resource_user->added_to_free_pool = false; if (resource_user->free_pool > 0) { int64_t amt = resource_user->free_pool; resource_user->free_pool = 0; @@ -386,6 +391,13 @@ static bool rq_reclaim_from_per_user_free_pool( gpr_mu_unlock(&resource_user->mu); return true; } else { + if (grpc_resource_quota_trace.enabled()) { + gpr_log(GPR_INFO, + "RQ %s %s: failed to reclaim_from_per_user_free_pool; " + "free_pool = %" PRId64 "; rq_free_pool = %" PRId64, + resource_quota->name, resource_user->name, + resource_user->free_pool, resource_quota->free_pool); + } gpr_mu_unlock(&resource_user->mu); } } @@ -622,6 +634,7 @@ grpc_resource_quota* grpc_resource_quota_create(const char* name) { resource_quota->combiner = grpc_combiner_create(); resource_quota->free_pool = INT64_MAX; resource_quota->size = INT64_MAX; + resource_quota->used = 0; gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX); gpr_mu_init(&resource_quota->thread_count_mu); resource_quota->max_threads = INT_MAX; @@ -712,7 +725,7 @@ size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) { */ grpc_resource_quota* grpc_resource_quota_from_channel_args( - const grpc_channel_args* channel_args) { + const grpc_channel_args* channel_args, bool create) { for (size_t i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { if (channel_args->args[i].type == GRPC_ARG_POINTER) { @@ -724,7 +737,7 @@ grpc_resource_quota* grpc_resource_quota_from_channel_args( } } } - return grpc_resource_quota_create(nullptr); + return create ? grpc_resource_quota_create(nullptr) : nullptr; } static void* rq_copy(void* rq) { @@ -863,33 +876,68 @@ void grpc_resource_user_free_threads(grpc_resource_user* resource_user, gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu); } -void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size, - grpc_closure* optional_on_done) { - gpr_mu_lock(&resource_user->mu); +static void resource_user_alloc_locked(grpc_resource_user* resource_user, + size_t size, + grpc_closure* optional_on_done) { ru_ref_by(resource_user, static_cast<gpr_atm>(size)); resource_user->free_pool -= static_cast<int64_t>(size); - resource_user->outstanding_allocations += static_cast<int64_t>(size); if (grpc_resource_quota_trace.enabled()) { gpr_log(GPR_INFO, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64, resource_user->resource_quota->name, resource_user->name, size, resource_user->free_pool); } if (resource_user->free_pool < 0) { - grpc_closure_list_append(&resource_user->on_allocated, optional_on_done, - GRPC_ERROR_NONE); + if (optional_on_done != nullptr) { + resource_user->outstanding_allocations += static_cast<int64_t>(size); + grpc_closure_list_append(&resource_user->on_allocated, optional_on_done, + GRPC_ERROR_NONE); + } if (!resource_user->allocating) { resource_user->allocating = true; GRPC_CLOSURE_SCHED(&resource_user->allocate_closure, GRPC_ERROR_NONE); } } else { - resource_user->outstanding_allocations -= static_cast<int64_t>(size); GRPC_CLOSURE_SCHED(optional_on_done, GRPC_ERROR_NONE); } +} + +bool grpc_resource_user_safe_alloc(grpc_resource_user* resource_user, + size_t size) { + if (gpr_atm_no_barrier_load(&resource_user->shutdown)) return false; + gpr_mu_lock(&resource_user->mu); + grpc_resource_quota* resource_quota = resource_user->resource_quota; + bool cas_success; + do { + gpr_atm used = gpr_atm_no_barrier_load(&resource_quota->used); + gpr_atm new_used = used + size; + if (static_cast<size_t>(new_used) > + grpc_resource_quota_peek_size(resource_quota)) { + gpr_mu_unlock(&resource_user->mu); + return false; + } + cas_success = gpr_atm_full_cas(&resource_quota->used, used, new_used); + } while (!cas_success); + resource_user_alloc_locked(resource_user, size, nullptr); + gpr_mu_unlock(&resource_user->mu); + return true; +} + +void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size, + grpc_closure* optional_on_done) { + // TODO(juanlishen): Maybe return immediately if shutting down. Deferring this + // because some tests become flaky after the change. + gpr_mu_lock(&resource_user->mu); + grpc_resource_quota* resource_quota = resource_user->resource_quota; + gpr_atm_no_barrier_fetch_add(&resource_quota->used, size); + resource_user_alloc_locked(resource_user, size, optional_on_done); gpr_mu_unlock(&resource_user->mu); } void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) { gpr_mu_lock(&resource_user->mu); + grpc_resource_quota* resource_quota = resource_user->resource_quota; + gpr_atm prior = gpr_atm_no_barrier_fetch_add(&resource_quota->used, -size); + GPR_ASSERT(prior >= static_cast<long>(size)); bool was_zero_or_negative = resource_user->free_pool <= 0; resource_user->free_pool += static_cast<int64_t>(size); if (grpc_resource_quota_trace.enabled()) { @@ -940,6 +988,12 @@ void grpc_resource_user_slice_allocator_init( void grpc_resource_user_alloc_slices( grpc_resource_user_slice_allocator* slice_allocator, size_t length, size_t count, grpc_slice_buffer* dest) { + if (gpr_atm_no_barrier_load(&slice_allocator->resource_user->shutdown)) { + GRPC_CLOSURE_SCHED( + &slice_allocator->on_allocated, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown")); + return; + } slice_allocator->length = length; slice_allocator->count = count; slice_allocator->dest = dest; diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h index 7b0ed7417a..1c79b52e3f 100644 --- a/src/core/lib/iomgr/resource_quota.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -65,11 +65,16 @@ extern grpc_core::TraceFlag grpc_resource_quota_trace; +// TODO(juanlishen): This is a hack. We need to do real accounting instead of +// hard coding. +constexpr size_t GRPC_RESOURCE_QUOTA_CALL_SIZE = 15 * 1024; +constexpr size_t GRPC_RESOURCE_QUOTA_CHANNEL_SIZE = 50 * 1024; + grpc_resource_quota* grpc_resource_quota_ref_internal( grpc_resource_quota* resource_quota); void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota); grpc_resource_quota* grpc_resource_quota_from_channel_args( - const grpc_channel_args* channel_args); + const grpc_channel_args* channel_args, bool create = true); /* Return a number indicating current memory pressure: 0.0 ==> no memory usage @@ -109,11 +114,21 @@ bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user, void grpc_resource_user_free_threads(grpc_resource_user* resource_user, int thread_count); -/* Allocate from the resource user (and its quota). - If optional_on_done is NULL, then allocate immediately. This may push the - quota over-limit, at which point reclamation will kick in. - If optional_on_done is non-NULL, it will be scheduled when the allocation has - been granted by the quota. */ +/* Allocates from the resource user 'size' worth of memory if this won't exceed + * the resource quota's total size. Returns whether the allocation is done + * successfully. If allocated successfully, the memory should be freed by the + * caller eventually. */ +bool grpc_resource_user_safe_alloc(grpc_resource_user* resource_user, + size_t size); +/* Allocates from the resource user 'size' worth of memory. + * If optional_on_done is NULL, then allocate immediately. This may push the + * quota over-limit, at which point reclamation will kick in. The caller is + * always responsible to free the memory eventually. + * If optional_on_done is non-NULL, it will be scheduled without error when the + * allocation has been granted by the quota, and the caller is responsible to + * free the memory eventually. Or it may be scheduled with an error, in which + * case the caller fails to allocate the memory and shouldn't free the memory. + */ void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size, grpc_closure* optional_on_done); /* Release memory back to the quota */ diff --git a/src/core/lib/iomgr/tcp_client_custom.cc b/src/core/lib/iomgr/tcp_client_custom.cc index 9389861d07..73344b18d8 100644 --- a/src/core/lib/iomgr/tcp_client_custom.cc +++ b/src/core/lib/iomgr/tcp_client_custom.cc @@ -81,9 +81,8 @@ static void on_alarm(void* acp, grpc_error* error) { } } -static void custom_connect_callback(grpc_custom_socket* socket, - grpc_error* error) { - grpc_core::ExecCtx exec_ctx; +static void custom_connect_callback_internal(grpc_custom_socket* socket, + grpc_error* error) { grpc_custom_tcp_connect* connect = socket->connector; int done; grpc_closure* closure = connect->closure; @@ -100,6 +99,18 @@ static void custom_connect_callback(grpc_custom_socket* socket, GRPC_CLOSURE_SCHED(closure, error); } +static void custom_connect_callback(grpc_custom_socket* socket, + grpc_error* error) { + if (grpc_core::ExecCtx::Get() == nullptr) { + /* If we are being run on a thread which does not have an exec_ctx created + * yet, we should create one. */ + grpc_core::ExecCtx exec_ctx; + custom_connect_callback_internal(socket, error); + } else { + custom_connect_callback_internal(socket, error); + } +} + static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep, grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.cc b/src/core/lib/security/credentials/google_default/google_default_credentials.cc index c456ffaf5d..fcab252959 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.cc +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.cc @@ -49,8 +49,8 @@ /* -- Default credentials. -- */ -static grpc_channel_credentials* g_default_credentials = nullptr; static int g_compute_engine_detection_done = 0; +static int g_need_compute_engine_creds = 0; static gpr_mu g_state_mu; static gpr_once g_once = GPR_ONCE_INIT; static grpc_core::internal::grpc_gce_tenancy_checker g_gce_tenancy_checker = @@ -182,19 +182,13 @@ grpc_channel_credentials* grpc_google_default_credentials_create(void) { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Failed to create Google credentials"); grpc_error* err; + int need_compute_engine_creds = 0; grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_google_default_credentials_create(void)", 0, ()); gpr_once_init(&g_once, init_default_credentials); - gpr_mu_lock(&g_state_mu); - - if (g_default_credentials != nullptr) { - result = grpc_channel_credentials_ref(g_default_credentials); - goto end; - } - /* First, try the environment variable. */ err = create_default_creds_from_path( gpr_getenv(GRPC_GOOGLE_CREDENTIALS_ENV_VAR), &call_creds); @@ -207,55 +201,50 @@ grpc_channel_credentials* grpc_google_default_credentials_create(void) { if (err == GRPC_ERROR_NONE) goto end; error = grpc_error_add_child(error, err); + gpr_mu_lock(&g_state_mu); /* At last try to see if we're on compute engine (do the detection only once since it requires a network test). */ if (!g_compute_engine_detection_done) { - int need_compute_engine_creds = g_gce_tenancy_checker(); + g_need_compute_engine_creds = g_gce_tenancy_checker(); g_compute_engine_detection_done = 1; - if (need_compute_engine_creds) { - call_creds = grpc_google_compute_engine_credentials_create(nullptr); - if (call_creds == nullptr) { - error = grpc_error_add_child( - error, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Failed to get credentials from network")); - } - } } + need_compute_engine_creds = g_need_compute_engine_creds; + gpr_mu_unlock(&g_state_mu); -end: - if (result == nullptr) { - if (call_creds != nullptr) { - /* Create google default credentials. */ - auto creds = static_cast<grpc_google_default_channel_credentials*>( - gpr_zalloc(sizeof(grpc_google_default_channel_credentials))); - creds->base.vtable = &google_default_credentials_vtable; - creds->base.type = GRPC_CHANNEL_CREDENTIALS_TYPE_GOOGLE_DEFAULT; - gpr_ref_init(&creds->base.refcount, 1); - creds->ssl_creds = - grpc_ssl_credentials_create(nullptr, nullptr, nullptr, nullptr); - GPR_ASSERT(creds->ssl_creds != nullptr); - grpc_alts_credentials_options* options = - grpc_alts_credentials_client_options_create(); - creds->alts_creds = grpc_alts_credentials_create(options); - grpc_alts_credentials_options_destroy(options); - /* Add a global reference so that it can be cached and re-served. */ - g_default_credentials = grpc_composite_channel_credentials_create( - &creds->base, call_creds, nullptr); - GPR_ASSERT(g_default_credentials != nullptr); - grpc_channel_credentials_unref(&creds->base); - grpc_call_credentials_unref(call_creds); - result = grpc_channel_credentials_ref(g_default_credentials); - } else { - gpr_log(GPR_ERROR, "Could not create google default credentials."); + if (need_compute_engine_creds) { + call_creds = grpc_google_compute_engine_credentials_create(nullptr); + if (call_creds == nullptr) { + error = grpc_error_add_child( + error, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to get credentials from network")); } } - gpr_mu_unlock(&g_state_mu); - if (result == nullptr) { - GRPC_LOG_IF_ERROR("grpc_google_default_credentials_create", error); + +end: + if (call_creds != nullptr) { + /* Create google default credentials. */ + auto creds = static_cast<grpc_google_default_channel_credentials*>( + gpr_zalloc(sizeof(grpc_google_default_channel_credentials))); + creds->base.vtable = &google_default_credentials_vtable; + creds->base.type = GRPC_CHANNEL_CREDENTIALS_TYPE_GOOGLE_DEFAULT; + gpr_ref_init(&creds->base.refcount, 1); + creds->ssl_creds = + grpc_ssl_credentials_create(nullptr, nullptr, nullptr, nullptr); + GPR_ASSERT(creds->ssl_creds != nullptr); + grpc_alts_credentials_options* options = + grpc_alts_credentials_client_options_create(); + creds->alts_creds = grpc_alts_credentials_create(options); + grpc_alts_credentials_options_destroy(options); + result = grpc_composite_channel_credentials_create(&creds->base, call_creds, + nullptr); + GPR_ASSERT(result != nullptr); + grpc_channel_credentials_unref(&creds->base); + grpc_call_credentials_unref(call_creds); } else { - GRPC_ERROR_UNREF(error); + gpr_log(GPR_ERROR, "Could not create google default credentials: %s", + grpc_error_string(error)); } - + GRPC_ERROR_UNREF(error); return result; } @@ -266,21 +255,17 @@ void set_gce_tenancy_checker_for_testing(grpc_gce_tenancy_checker checker) { g_gce_tenancy_checker = checker; } -} // namespace internal -} // namespace grpc_core - void grpc_flush_cached_google_default_credentials(void) { grpc_core::ExecCtx exec_ctx; gpr_once_init(&g_once, init_default_credentials); gpr_mu_lock(&g_state_mu); - if (g_default_credentials != nullptr) { - grpc_channel_credentials_unref(g_default_credentials); - g_default_credentials = nullptr; - } g_compute_engine_detection_done = 0; gpr_mu_unlock(&g_state_mu); } +} // namespace internal +} // namespace grpc_core + /* -- Well known credentials path. -- */ static grpc_well_known_credentials_path_getter creds_path_getter = nullptr; diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.h b/src/core/lib/security/credentials/google_default/google_default_credentials.h index a7dd0ea8ae..b9e2efb04f 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.h +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.h @@ -45,8 +45,6 @@ typedef struct { grpc_channel_credentials* ssl_creds; } grpc_google_default_channel_credentials; -void grpc_flush_cached_google_default_credentials(void); - namespace grpc_core { namespace internal { @@ -54,6 +52,9 @@ typedef bool (*grpc_gce_tenancy_checker)(void); void set_gce_tenancy_checker_for_testing(grpc_gce_tenancy_checker checker); +// TEST-ONLY. Reset the internal global state. +void grpc_flush_cached_google_default_credentials(void); + } // namespace internal } // namespace grpc_core diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index d7095c24d4..230b89b3fc 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -39,6 +39,7 @@ #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -63,6 +64,7 @@ struct grpc_channel { grpc_compression_options compression_options; gpr_atm call_size_estimate; + grpc_resource_user* resource_user; gpr_mu registered_call_mu; registered_call* registered_calls; @@ -82,6 +84,8 @@ grpc_channel* grpc_channel_create_with_builder( char* target = gpr_strdup(grpc_channel_stack_builder_get_target(builder)); grpc_channel_args* args = grpc_channel_args_copy( grpc_channel_stack_builder_get_channel_arguments(builder)); + grpc_resource_user* resource_user = + grpc_channel_stack_builder_get_resource_user(builder); grpc_channel* channel; if (channel_stack_type == GRPC_SERVER_CHANNEL) { GRPC_STATS_INC_SERVER_CHANNELS_CREATED(); @@ -101,9 +105,11 @@ grpc_channel* grpc_channel_create_with_builder( } channel->target = target; + channel->resource_user = resource_user; channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type); bool channelz_enabled = GRPC_ENABLE_CHANNELZ_DEFAULT; - size_t channel_tracer_max_memory = 0; // default to off + size_t channel_tracer_max_memory = + GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT; bool internal_channel = false; // this creates the default ChannelNode. Different types of channels may // override this to ensure a correct ChannelNode is created. @@ -142,7 +148,6 @@ grpc_channel* grpc_channel_create_with_builder( 0x1; /* always support no compression */ } else if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE)) { - GPR_ASSERT(channel_tracer_max_memory == 0); const grpc_integer_options options = { GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX}; channel_tracer_max_memory = @@ -217,7 +222,8 @@ grpc_core::channelz::ChannelNode* grpc_channel_get_channelz_node( grpc_channel* grpc_channel_create(const char* target, const grpc_channel_args* input_args, grpc_channel_stack_type channel_stack_type, - grpc_transport* optional_transport) { + grpc_transport* optional_transport, + grpc_resource_user* resource_user) { grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); const grpc_core::UniquePtr<char> default_authority = get_default_authority(input_args); @@ -227,11 +233,17 @@ grpc_channel* grpc_channel_create(const char* target, grpc_channel_args_destroy(args); grpc_channel_stack_builder_set_target(builder, target); grpc_channel_stack_builder_set_transport(builder, optional_transport); + grpc_channel_stack_builder_set_resource_user(builder, resource_user); if (!grpc_channel_init_create_stack(builder, channel_stack_type)) { grpc_channel_stack_builder_destroy(builder); + if (resource_user != nullptr) { + grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); + } return nullptr; } - return grpc_channel_create_with_builder(builder, channel_stack_type); + grpc_channel* channel = + grpc_channel_create_with_builder(builder, channel_stack_type); + return channel; } size_t grpc_channel_get_call_size_estimate(grpc_channel* channel) { @@ -441,6 +453,10 @@ static void destroy_channel(void* arg, grpc_error* error) { GRPC_MDELEM_UNREF(rc->authority); gpr_free(rc); } + if (channel->resource_user != nullptr) { + grpc_resource_user_free(channel->resource_user, + GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); + } gpr_mu_destroy(&channel->registered_call_mu); gpr_free(channel->target); gpr_free(channel); diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index 4ac76b8a29..ab00b8e94f 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -29,7 +29,8 @@ grpc_channel* grpc_channel_create(const char* target, const grpc_channel_args* args, grpc_channel_stack_type channel_stack_type, - grpc_transport* optional_transport); + grpc_transport* optional_transport, + grpc_resource_user* resource_user = nullptr); grpc_channel* grpc_channel_create_with_builder( grpc_channel_stack_builder* builder, diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 35ab2c3bce..72391ca697 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -189,6 +189,8 @@ typedef struct { struct grpc_server { grpc_channel_args* channel_args; + grpc_resource_user* default_resource_user; + grpc_completion_queue** cqs; grpc_pollset** pollsets; size_t cq_count; @@ -1024,6 +1026,15 @@ grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) { grpc_slice_from_static_string("Server created")); } + if (args != nullptr) { + grpc_resource_quota* resource_quota = + grpc_resource_quota_from_channel_args(args, false /* create */); + if (resource_quota != nullptr) { + server->default_resource_user = + grpc_resource_user_create(resource_quota, "default"); + } + } + return server; } @@ -1122,7 +1133,8 @@ void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets, void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset, const grpc_channel_args* args, - intptr_t socket_uuid) { + intptr_t socket_uuid, + grpc_resource_user* resource_user) { size_t num_registered_methods; size_t alloc; registered_method* rm; @@ -1135,7 +1147,8 @@ void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport, uint32_t max_probes = 0; grpc_transport_op* op = nullptr; - channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport); + channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport, + resource_user); chand = static_cast<channel_data*>( grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0) ->channel_data); @@ -1330,6 +1343,13 @@ void grpc_server_shutdown_and_notify(grpc_server* server, channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */, GRPC_ERROR_NONE); + + if (server->default_resource_user != nullptr) { + grpc_resource_quota_unref( + grpc_resource_user_quota(server->default_resource_user)); + grpc_resource_user_shutdown(server->default_resource_user); + grpc_resource_user_unref(server->default_resource_user); + } } void grpc_server_cancel_all_calls(grpc_server* server) { @@ -1546,6 +1566,10 @@ const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) { return server->channel_args; } +grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server) { + return server->default_resource_user; +} + int grpc_server_has_open_connections(grpc_server* server) { int r; gpr_mu_lock(&server->mu_global); diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index 33c205417e..27038fdb7a 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -47,7 +47,8 @@ void grpc_server_add_listener(grpc_server* server, void* listener, void grpc_server_setup_transport(grpc_server* server, grpc_transport* transport, grpc_pollset* accepting_pollset, const grpc_channel_args* args, - intptr_t socket_uuid); + intptr_t socket_uuid, + grpc_resource_user* resource_user = nullptr); /* fills in the uuids of all sockets used for connections on this server */ void grpc_server_populate_server_sockets( @@ -63,6 +64,8 @@ grpc_core::channelz::ServerNode* grpc_server_get_channelz_node( const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server); +grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server); + int grpc_server_has_open_connections(grpc_server* server); /* Do not call this before grpc_server_start. Returns the pollsets and the diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 2cab41b3f5..f5c1b4e2c5 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -33,6 +33,7 @@ #include <grpcpp/client_context.h> #include <grpcpp/completion_queue.h> #include <grpcpp/impl/call.h> +#include <grpcpp/impl/codegen/call_op_set.h> #include <grpcpp/impl/codegen/completion_queue_tag.h> #include <grpcpp/impl/grpc_library.h> #include <grpcpp/impl/rpc_method.h> @@ -57,9 +58,8 @@ Channel::Channel( std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>> interceptor_creators) : host_(host), c_channel_(channel) { - auto* vector = interceptor_creators.release(); - if (vector != nullptr) { - interceptor_creators_ = std::move(*vector); + if (interceptor_creators != nullptr) { + interceptor_creators_ = std::move(*interceptor_creators); } g_gli_initializer.summon(); } @@ -112,9 +112,10 @@ void ChannelResetConnectionBackoff(Channel* channel) { } // namespace experimental -internal::Call Channel::CreateCall(const internal::RpcMethod& method, - ClientContext* context, - CompletionQueue* cq) { +internal::Call Channel::CreateCallInternal(const internal::RpcMethod& method, + ClientContext* context, + CompletionQueue* cq, + int interceptor_pos) { const bool kRegistered = method.channel_tag() && context->authority().empty(); grpc_call* c_call = nullptr; if (kRegistered) { @@ -147,17 +148,22 @@ internal::Call Channel::CreateCall(const internal::RpcMethod& method, } grpc_census_call_set_context(c_call, context->census_context()); context->set_call(c_call, shared_from_this()); - return internal::Call(c_call, this, cq); + + auto* info = context->set_client_rpc_info( + method.name(), this, interceptor_creators_, interceptor_pos); + return internal::Call(c_call, this, cq, info); +} + +internal::Call Channel::CreateCall(const internal::RpcMethod& method, + ClientContext* context, + CompletionQueue* cq) { + return CreateCallInternal(method, context, cq, 0); } void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops, internal::Call* call) { - static const size_t MAX_OPS = 8; - size_t nops = 0; - grpc_op cops[MAX_OPS]; - ops->FillOps(call->call(), cops, &nops); - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), cops, nops, - ops->cq_tag(), nullptr)); + ops->FillOps( + call); // Make a copy of call. It's fine since Call just has pointers } void* Channel::RegisterMethod(const char* method) { @@ -219,7 +225,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { static void Run(grpc_experimental_completion_queue_functor* cb, int) { auto* callback = static_cast<ShutdownCallback*>(cb); delete callback->cq_; - grpc_core::Delete(callback); + delete callback; } private: @@ -232,7 +238,7 @@ CompletionQueue* Channel::CallbackCQ() { // if there is no explicit per-channel CQ registered std::lock_guard<std::mutex> l(mu_); if (callback_cq_ == nullptr) { - auto* shutdown_callback = grpc_core::New<ShutdownCallback>(); + auto* shutdown_callback = new ShutdownCallback; callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}); diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index 6893201e2e..d93a54aed7 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -60,10 +60,10 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( case GRPC_QUEUE_SHUTDOWN: return SHUTDOWN; case GRPC_OP_COMPLETE: - auto cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag); + auto core_cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag); *ok = ev.success != 0; - *tag = cq_tag; - if (cq_tag->FinalizeResult(tag, ok)) { + *tag = core_cq_tag; + if (core_cq_tag->FinalizeResult(tag, ok)) { return GOT_EVENT; } break; @@ -87,9 +87,9 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) { flushed_ = true; if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag, &res)) { - auto cq_tag = static_cast<internal::CompletionQueueTag*>(res_tag); + auto core_cq_tag = static_cast<internal::CompletionQueueTag*>(res_tag); *ok = res == 1; - if (cq_tag->FinalizeResult(tag, ok)) { + if (core_cq_tag->FinalizeResult(tag, ok)) { return true; } } diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc index 619aacadaa..cfaa2e7b19 100644 --- a/src/cpp/common/core_codegen.cc +++ b/src/cpp/common/core_codegen.cc @@ -102,6 +102,13 @@ size_t CoreCodegen::grpc_byte_buffer_length(grpc_byte_buffer* bb) { return ::grpc_byte_buffer_length(bb); } +grpc_call_error CoreCodegen::grpc_call_start_batch(grpc_call* call, + const grpc_op* ops, + size_t nops, void* tag, + void* reserved) { + return ::grpc_call_start_batch(call, ops, nops, tag, reserved); +} + grpc_call_error CoreCodegen::grpc_call_cancel_with_status( grpc_call* call, grpc_status_code status, const char* description, void* reserved) { diff --git a/src/cpp/server/channelz/channelz_service.cc b/src/cpp/server/channelz/channelz_service.cc index 79ed9102e5..9ecb9de7e4 100644 --- a/src/cpp/server/channelz/channelz_service.cc +++ b/src/cpp/server/channelz/channelz_service.cc @@ -20,9 +20,6 @@ #include "src/cpp/server/channelz/channelz_service.h" -#include <google/protobuf/text_format.h> -#include <google/protobuf/util/json_util.h> - #include <grpc/grpc.h> #include <grpc/support/alloc.h> @@ -33,13 +30,14 @@ Status ChannelzService::GetTopChannels( channelz::v1::GetTopChannelsResponse* response) { char* json_str = grpc_channelz_get_top_channels(request->start_channel_id()); if (json_str == nullptr) { - return Status(INTERNAL, "grpc_channelz_get_top_channels returned null"); + return Status(StatusCode::INTERNAL, + "grpc_channelz_get_top_channels returned null"); } - google::protobuf::util::Status s = - google::protobuf::util::JsonStringToMessage(json_str, response); + grpc::protobuf::util::Status s = + grpc::protobuf::json::JsonStringToMessage(json_str, response); gpr_free(json_str); - if (s != google::protobuf::util::Status::OK) { - return Status(INTERNAL, s.ToString()); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); } return Status::OK; } @@ -49,13 +47,31 @@ Status ChannelzService::GetServers( channelz::v1::GetServersResponse* response) { char* json_str = grpc_channelz_get_servers(request->start_server_id()); if (json_str == nullptr) { - return Status(INTERNAL, "grpc_channelz_get_servers returned null"); + return Status(StatusCode::INTERNAL, + "grpc_channelz_get_servers returned null"); + } + grpc::protobuf::util::Status s = + grpc::protobuf::json::JsonStringToMessage(json_str, response); + gpr_free(json_str); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); + } + return Status::OK; +} + +Status ChannelzService::GetServer(ServerContext* unused, + const channelz::v1::GetServerRequest* request, + channelz::v1::GetServerResponse* response) { + char* json_str = grpc_channelz_get_server(request->server_id()); + if (json_str == nullptr) { + return Status(StatusCode::INTERNAL, + "grpc_channelz_get_server returned null"); } - google::protobuf::util::Status s = - google::protobuf::util::JsonStringToMessage(json_str, response); + grpc::protobuf::util::Status s = + grpc::protobuf::json::JsonStringToMessage(json_str, response); gpr_free(json_str); - if (s != google::protobuf::util::Status::OK) { - return Status(INTERNAL, s.ToString()); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); } return Status::OK; } @@ -66,13 +82,14 @@ Status ChannelzService::GetServerSockets( char* json_str = grpc_channelz_get_server_sockets(request->server_id(), request->start_socket_id()); if (json_str == nullptr) { - return Status(INTERNAL, "grpc_channelz_get_server_sockets returned null"); + return Status(StatusCode::INTERNAL, + "grpc_channelz_get_server_sockets returned null"); } - google::protobuf::util::Status s = - google::protobuf::util::JsonStringToMessage(json_str, response); + grpc::protobuf::util::Status s = + grpc::protobuf::json::JsonStringToMessage(json_str, response); gpr_free(json_str); - if (s != google::protobuf::util::Status::OK) { - return Status(INTERNAL, s.ToString()); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); } return Status::OK; } @@ -82,13 +99,13 @@ Status ChannelzService::GetChannel( channelz::v1::GetChannelResponse* response) { char* json_str = grpc_channelz_get_channel(request->channel_id()); if (json_str == nullptr) { - return Status(NOT_FOUND, "No object found for that ChannelId"); + return Status(StatusCode::NOT_FOUND, "No object found for that ChannelId"); } - google::protobuf::util::Status s = - google::protobuf::util::JsonStringToMessage(json_str, response); + grpc::protobuf::util::Status s = + grpc::protobuf::json::JsonStringToMessage(json_str, response); gpr_free(json_str); - if (s != google::protobuf::util::Status::OK) { - return Status(INTERNAL, s.ToString()); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); } return Status::OK; } @@ -98,13 +115,14 @@ Status ChannelzService::GetSubchannel( channelz::v1::GetSubchannelResponse* response) { char* json_str = grpc_channelz_get_subchannel(request->subchannel_id()); if (json_str == nullptr) { - return Status(NOT_FOUND, "No object found for that SubchannelId"); + return Status(StatusCode::NOT_FOUND, + "No object found for that SubchannelId"); } - google::protobuf::util::Status s = - google::protobuf::util::JsonStringToMessage(json_str, response); + grpc::protobuf::util::Status s = + grpc::protobuf::json::JsonStringToMessage(json_str, response); gpr_free(json_str); - if (s != google::protobuf::util::Status::OK) { - return Status(INTERNAL, s.ToString()); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); } return Status::OK; } @@ -114,13 +132,13 @@ Status ChannelzService::GetSocket(ServerContext* unused, channelz::v1::GetSocketResponse* response) { char* json_str = grpc_channelz_get_socket(request->socket_id()); if (json_str == nullptr) { - return Status(NOT_FOUND, "No object found for that SocketId"); + return Status(StatusCode::NOT_FOUND, "No object found for that SocketId"); } - google::protobuf::util::Status s = - google::protobuf::util::JsonStringToMessage(json_str, response); + grpc::protobuf::util::Status s = + grpc::protobuf::json::JsonStringToMessage(json_str, response); gpr_free(json_str); - if (s != google::protobuf::util::Status::OK) { - return Status(INTERNAL, s.ToString()); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); } return Status::OK; } diff --git a/src/cpp/server/channelz/channelz_service.h b/src/cpp/server/channelz/channelz_service.h index 590b5d492e..b4a66ba1c6 100644 --- a/src/cpp/server/channelz/channelz_service.h +++ b/src/cpp/server/channelz/channelz_service.h @@ -36,6 +36,10 @@ class ChannelzService final : public channelz::v1::Channelz::Service { Status GetServers(ServerContext* unused, const channelz::v1::GetServersRequest* request, channelz::v1::GetServersResponse* response) override; + // implementation of GetServer rpc + Status GetServer(ServerContext* unused, + const channelz::v1::GetServerRequest* request, + channelz::v1::GetServerResponse* response) override; // implementation of GetServerSockets rpc Status GetServerSockets( ServerContext* unused, diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 8417c45e64..0dc03b6876 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -71,7 +71,9 @@ ServerBuilder::~ServerBuilder() { std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue( bool is_frequently_polled) { ServerCompletionQueue* cq = new ServerCompletionQueue( - is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING); + GRPC_CQ_NEXT, + is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING, + nullptr); cqs_.push_back(cq); return std::unique_ptr<ServerCompletionQueue>(cq); } @@ -256,14 +258,22 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { // Create completion queues to listen to incoming rpc requests for (int i = 0; i < sync_server_settings_.num_cqs; i++) { - sync_server_cqs->emplace_back(new ServerCompletionQueue(polling_type)); + sync_server_cqs->emplace_back( + new ServerCompletionQueue(GRPC_CQ_NEXT, polling_type, nullptr)); } } - std::unique_ptr<Server> server(new Server( - max_receive_message_size_, &args, sync_server_cqs, - sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, - sync_server_settings_.cq_timeout_msec, resource_quota_)); + // == Determine if the server has any callback methods == + bool has_callback_methods = false; + for (auto it = services_.begin(); it != services_.end(); ++it) { + if ((*it)->service->has_callback_methods()) { + has_callback_methods = true; + break; + } + } + + // TODO(vjpai): Add a section here for plugins once they can support callback + // methods if (has_sync_methods) { // This is a Sync server @@ -275,6 +285,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { sync_server_settings_.cq_timeout_msec); } + if (has_callback_methods) { + gpr_log(GPR_INFO, "Callback server."); + } + + std::unique_ptr<Server> server(new Server( + max_receive_message_size_, &args, sync_server_cqs, + sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, + sync_server_settings_.cq_timeout_msec, resource_quota_, + std::move(interceptor_creators_))); + ServerInitializer* initializer = server->initializer(); // Register all the completion queues with the server. i.e @@ -288,6 +308,12 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { num_frequently_polled_cqs++; } + if (has_callback_methods) { + auto* cq = server->CallbackCQ(); + grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); + num_frequently_polled_cqs++; + } + // cqs_ contains the completion queue added by calling the ServerBuilder's // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by // calling Next() or AsyncNext()) and hence are not safe to be used for diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 7aeddff643..870ee84e3e 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -27,7 +27,9 @@ #include <grpcpp/completion_queue.h> #include <grpcpp/generic/async_generic_service.h> #include <grpcpp/impl/codegen/async_unary_call.h> +#include <grpcpp/impl/codegen/call.h> #include <grpcpp/impl/codegen/completion_queue_tag.h> +#include <grpcpp/impl/codegen/server_interceptor.h> #include <grpcpp/impl/grpc_library.h> #include <grpcpp/impl/method_handler_impl.h> #include <grpcpp/impl/rpc_service_method.h> @@ -38,8 +40,10 @@ #include <grpcpp/support/time.h> #include "src/core/ext/transport/inproc/inproc_transport.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/surface/call.h" +#include "src/core/lib/surface/completion_queue.h" #include "src/cpp/client/create_channel_internal.h" #include "src/cpp/server/health/default_health_check_service.h" #include "src/cpp/thread_manager/thread_manager.h" @@ -127,10 +131,13 @@ class Server::UnimplementedAsyncResponse final ~UnimplementedAsyncResponse() { delete request_; } bool FinalizeResult(void** tag, bool* status) override { - internal::CallOpSet< - internal::CallOpSendInitialMetadata, - internal::CallOpServerSendStatus>::FinalizeResult(tag, status); - delete this; + if (internal::CallOpSet< + internal::CallOpSendInitialMetadata, + internal::CallOpServerSendStatus>::FinalizeResult(tag, status)) { + delete this; + } else { + // The tag was swallowed due to interception. We will see it again. + } return false; } @@ -140,9 +147,9 @@ class Server::UnimplementedAsyncResponse final class Server::SyncRequest final : public internal::CompletionQueueTag { public: - SyncRequest(internal::RpcServiceMethod* method, void* tag) + SyncRequest(internal::RpcServiceMethod* method, void* method_tag) : method_(method), - tag_(tag), + method_tag_(method_tag), in_flight_(false), has_request_payload_( method->method_type() == internal::RpcMethod::NORMAL_RPC || @@ -169,10 +176,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(cq_ && !in_flight_); in_flight_ = true; - if (tag_) { + if (method_tag_) { if (GRPC_CALL_OK != grpc_server_request_registered_call( - server, tag_, &call_, &deadline_, &request_metadata_, + server, method_tag_, &call_, &deadline_, &request_metadata_, has_request_payload_ ? &request_payload_ : nullptr, cq_, notify_cq, this)) { TeardownRequest(); @@ -204,17 +211,25 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { return true; } + // The CallData class represents a call that is "active" as opposed + // to just being requested. It wraps and takes ownership of the cq from + // the call request class CallData final { public: explicit CallData(Server* server, SyncRequest* mrd) : cq_(mrd->cq_), - call_(mrd->call_, server, &cq_, server->max_receive_message_size()), ctx_(mrd->deadline_, &mrd->request_metadata_), has_request_payload_(mrd->has_request_payload_), request_payload_(has_request_payload_ ? mrd->request_payload_ : nullptr), + request_(nullptr), method_(mrd->method_), - server_(server) { + call_(mrd->call_, server, &cq_, server->max_receive_message_size(), + ctx_.set_server_rpc_info(method_->name(), + server->interceptor_creators_)), + server_(server), + global_callbacks_(nullptr), + resources_(false) { ctx_.set_call(mrd->call_); ctx_.cq_ = &cq_; GPR_ASSERT(mrd->in_flight_); @@ -230,38 +245,79 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks, bool resources) { - ctx_.BeginCompletionOp(&call_); - global_callbacks->PreSynchronousRequest(&ctx_); - auto* handler = resources ? method_->handler() - : server_->resource_exhausted_handler_.get(); - handler->RunHandler(internal::MethodHandler::HandlerParameter( - &call_, &ctx_, request_payload_)); - global_callbacks->PostSynchronousRequest(&ctx_); - request_payload_ = nullptr; - - cq_.Shutdown(); + global_callbacks_ = global_callbacks; + resources_ = resources; + + interceptor_methods_.SetCall(&call_); + interceptor_methods_.SetReverse(); + // Set interception point for RECV INITIAL METADATA + interceptor_methods_.AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); + interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_); + + if (has_request_payload_) { + // Set interception point for RECV MESSAGE + auto* handler = resources_ ? method_->handler() + : server_->resource_exhausted_handler_.get(); + request_ = handler->Deserialize(call_.call(), request_payload_, + &request_status_); + + request_payload_ = nullptr; + interceptor_methods_.AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + interceptor_methods_.SetRecvMessage(request_); + } - internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); - cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); + auto f = std::bind(&CallData::ContinueRunAfterInterception, this); + if (interceptor_methods_.RunInterceptors(f)) { + ContinueRunAfterInterception(); + } else { + // There were interceptors to be run, so ContinueRunAfterInterception + // will be run when interceptors are done. + } + } - /* Ensure the cq_ is shutdown */ - DummyTag ignored_tag; - GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); + void ContinueRunAfterInterception() { + { + ctx_.BeginCompletionOp(&call_, false); + global_callbacks_->PreSynchronousRequest(&ctx_); + auto* handler = resources_ ? method_->handler() + : server_->resource_exhausted_handler_.get(); + handler->RunHandler(internal::MethodHandler::HandlerParameter( + &call_, &ctx_, request_, request_status_, nullptr)); + request_ = nullptr; + global_callbacks_->PostSynchronousRequest(&ctx_); + + cq_.Shutdown(); + + internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); + cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); + + /* Ensure the cq_ is shutdown */ + DummyTag ignored_tag; + GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); + } + delete this; } private: CompletionQueue cq_; - internal::Call call_; ServerContext ctx_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; + void* request_; + Status request_status_; internal::RpcServiceMethod* const method_; + internal::Call call_; Server* server_; + std::shared_ptr<GlobalCallbacks> global_callbacks_; + bool resources_; + internal::InterceptorBatchMethodsImpl interceptor_methods_; }; private: internal::RpcServiceMethod* const method_; - void* const tag_; + void* const method_tag_; bool in_flight_; const bool has_request_payload_; grpc_call* call_; @@ -272,6 +328,176 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { grpc_completion_queue* cq_; }; +class Server::CallbackRequest final : public internal::CompletionQueueTag { + public: + CallbackRequest(Server* server, internal::RpcServiceMethod* method, + void* method_tag) + : server_(server), + method_(method), + method_tag_(method_tag), + has_request_payload_( + method->method_type() == internal::RpcMethod::NORMAL_RPC || + method->method_type() == internal::RpcMethod::SERVER_STREAMING), + cq_(server->CallbackCQ()), + tag_(this) { + Setup(); + } + + ~CallbackRequest() { Clear(); } + + void Request() { + if (method_tag_) { + if (GRPC_CALL_OK != + grpc_server_request_registered_call( + server_->c_server(), method_tag_, &call_, &deadline_, + &request_metadata_, + has_request_payload_ ? &request_payload_ : nullptr, cq_->cq(), + cq_->cq(), static_cast<void*>(&tag_))) { + return; + } + } else { + if (!call_details_) { + call_details_ = new grpc_call_details; + grpc_call_details_init(call_details_); + } + if (grpc_server_request_call(server_->c_server(), &call_, call_details_, + &request_metadata_, cq_->cq(), cq_->cq(), + static_cast<void*>(&tag_)) != GRPC_CALL_OK) { + return; + } + } + } + + bool FinalizeResult(void** tag, bool* status) override { return false; } + + private: + class CallbackCallTag : public grpc_experimental_completion_queue_functor { + public: + CallbackCallTag(Server::CallbackRequest* req) : req_(req) { + functor_run = &CallbackCallTag::StaticRun; + } + + // force_run can not be performed on a tag if operations using this tag + // have been sent to PerformOpsOnCall. It is intended for error conditions + // that are detected before the operations are internally processed. + void force_run(bool ok) { Run(ok); } + + private: + Server::CallbackRequest* req_; + internal::Call* call_; + + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok)); + } + void Run(bool ok) { + void* ignored = req_; + bool new_ok = ok; + GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok)); + GPR_ASSERT(ignored == req_); + + if (!ok) { + // The call has been shutdown + req_->Clear(); + return; + } + + // Bind the call, deadline, and metadata from what we got + req_->ctx_.set_call(req_->call_); + req_->ctx_.cq_ = req_->cq_; + req_->ctx_.BindDeadlineAndMetadata(req_->deadline_, + &req_->request_metadata_); + req_->request_metadata_.count = 0; + + // Create a C++ Call to control the underlying core call + call_ = new (grpc_call_arena_alloc(req_->call_, sizeof(internal::Call))) + internal::Call( + req_->call_, req_->server_, req_->cq_, + req_->server_->max_receive_message_size(), + req_->ctx_.set_server_rpc_info( + req_->method_->name(), req_->server_->interceptor_creators_)); + + req_->interceptor_methods_.SetCall(call_); + req_->interceptor_methods_.SetReverse(); + // Set interception point for RECV INITIAL METADATA + req_->interceptor_methods_.AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); + req_->interceptor_methods_.SetRecvInitialMetadata( + &req_->ctx_.client_metadata_); + + if (req_->has_request_payload_) { + // Set interception point for RECV MESSAGE + req_->request_ = req_->method_->handler()->Deserialize( + req_->call_, req_->request_payload_, &req_->request_status_); + req_->request_payload_ = nullptr; + req_->interceptor_methods_.AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + req_->interceptor_methods_.SetRecvMessage(req_->request_); + } + + if (req_->interceptor_methods_.RunInterceptors( + [this] { ContinueRunAfterInterception(); })) { + ContinueRunAfterInterception(); + } else { + // There were interceptors to be run, so ContinueRunAfterInterception + // will be run when interceptors are done. + } + } + void ContinueRunAfterInterception() { + req_->ctx_.BeginCompletionOp(call_, true); + req_->method_->handler()->RunHandler( + internal::MethodHandler::HandlerParameter( + call_, &req_->ctx_, req_->request_, req_->request_status_, + [this] { + req_->Reset(); + req_->Request(); + })); + } + }; + + void Reset() { + Clear(); + Setup(); + } + + void Clear() { + if (call_details_) { + delete call_details_; + call_details_ = nullptr; + } + grpc_metadata_array_destroy(&request_metadata_); + if (has_request_payload_ && request_payload_) { + grpc_byte_buffer_destroy(request_payload_); + } + ctx_.Clear(); + interceptor_methods_.ClearState(); + } + + void Setup() { + grpc_metadata_array_init(&request_metadata_); + ctx_.Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); + request_payload_ = nullptr; + request_ = nullptr; + request_status_ = Status(); + } + + Server* const server_; + internal::RpcServiceMethod* const method_; + void* const method_tag_; + const bool has_request_payload_; + grpc_byte_buffer* request_payload_; + void* request_; + Status request_status_; + grpc_call_details* call_details_ = nullptr; + grpc_call* call_; + gpr_timespec deadline_; + grpc_metadata_array request_metadata_; + CompletionQueue* cq_; + CallbackCallTag tag_; + ServerContext ctx_; + internal::InterceptorBatchMethodsImpl interceptor_methods_; +}; + // Implementation of ThreadManager. Each instance of SyncRequestThreadManager // manages a pool of threads that poll for incoming Sync RPCs and call the // appropriate RPC handlers @@ -318,8 +544,9 @@ class Server::SyncRequestThreadManager : public ThreadManager { } if (ok) { - // Calldata takes ownership of the completion queue inside sync_req - SyncRequest::CallData cd(server_, sync_req); + // Calldata takes ownership of the completion queue and interceptors + // inside sync_req + auto* cd = new SyncRequest::CallData(server_, sync_req); // Prepare for the next request if (!IsShutdown()) { sync_req->SetupRequest(); // Create new completion queue for sync_req @@ -327,7 +554,7 @@ class Server::SyncRequestThreadManager : public ThreadManager { } GPR_TIMER_SCOPE("cd.Run()", 0); - cd.Run(global_callbacks_, resources); + cd->Run(global_callbacks_, resources); } // TODO (sreek) If ok is false here (which it isn't in case of // grpc_request_registered_call), we should still re-queue the request @@ -389,8 +616,12 @@ Server::Server( std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> sync_server_cqs, int min_pollers, int max_pollers, int sync_cq_timeout_msec, - grpc_resource_quota* server_rq) - : max_receive_message_size_(max_receive_message_size), + grpc_resource_quota* server_rq, + std::vector< + std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> + interceptor_creators) + : interceptor_creators_(std::move(interceptor_creators)), + max_receive_message_size_(max_receive_message_size), sync_server_cqs_(std::move(sync_server_cqs)), started_(false), shutdown_(false), @@ -446,6 +677,9 @@ Server::Server( Server::~Server() { { std::unique_lock<std::mutex> lock(mu_); + if (callback_cq_ != nullptr) { + callback_cq_->Shutdown(); + } if (started_ && !shutdown_) { lock.unlock(); Shutdown(); @@ -518,21 +752,28 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { } internal::RpcServiceMethod* method = it->get(); - void* tag = grpc_server_register_method( + void* method_registration_tag = grpc_server_register_method( server_, method->name(), host ? host->c_str() : nullptr, PayloadHandlingForMethod(method), 0); - if (tag == nullptr) { + if (method_registration_tag == nullptr) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); return false; } - if (method->handler() == nullptr) { // Async method - method->set_server_tag(tag); - } else { + if (method->handler() == nullptr) { // Async method without handler + method->set_server_tag(method_registration_tag); + } else if (method->api_type() == + internal::RpcServiceMethod::ApiType::SYNC) { for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { - (*it)->AddSyncMethod(method, tag); + (*it)->AddSyncMethod(method, method_registration_tag); } + } else { + // a callback method + auto* req = new CallbackRequest(this, method, method_registration_tag); + callback_reqs_.emplace_back(req); + // Enqueue it so that it will be Request'ed later once + // all request matchers are created at core server startup } method_name = method->name(); @@ -583,7 +824,8 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { // performance. This ensures that we don't introduce thread hops // for application requests that wind up on this CQ, which is polled // in its own thread. - health_check_cq = new ServerCompletionQueue(GRPC_CQ_NON_POLLING); + health_check_cq = + new ServerCompletionQueue(GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); grpc_server_register_completion_queue(server_, health_check_cq->cq(), nullptr); default_health_check_service_impl = @@ -620,6 +862,10 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { (*it)->Start(); } + for (auto& cbreq : callback_reqs_) { + cbreq->Request(); + } + if (default_health_check_service_impl != nullptr) { default_health_check_service_impl->StartServingThread(); } @@ -681,31 +927,27 @@ void Server::Wait() { void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops, internal::Call* call) { - static const size_t MAX_OPS = 8; - size_t nops = 0; - grpc_op cops[MAX_OPS]; - ops->FillOps(call->call(), cops, &nops); - auto result = - grpc_call_start_batch(call->call(), cops, nops, ops->cq_tag(), nullptr); - if (result != GRPC_CALL_OK) { - gpr_log(GPR_ERROR, "Fatal: grpc_call_start_batch returned %d", result); - grpc_call_log_batch(__FILE__, __LINE__, GPR_LOG_SEVERITY_ERROR, - call->call(), cops, nops, ops); - abort(); - } + ops->FillOps(call); } ServerInterface::BaseAsyncRequest::BaseAsyncRequest( ServerInterface* server, ServerContext* context, internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - void* tag, bool delete_on_finalize) + ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) : server_(server), context_(context), stream_(stream), call_cq_(call_cq), + notification_cq_(notification_cq), tag_(tag), delete_on_finalize_(delete_on_finalize), - call_(nullptr) { + call_(nullptr), + done_intercepting_(false) { + /* Set up interception state partially for the receive ops. call_wrapper_ is + * not filled at this point, but it will be filled before the interceptors are + * run. */ + interceptor_methods_.SetCall(&call_wrapper_); + interceptor_methods_.SetReverse(); call_cq_->RegisterAvalanching(); // This op will trigger more ops } @@ -715,15 +957,45 @@ ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) { + if (done_intercepting_) { + *tag = tag_; + if (delete_on_finalize_) { + delete this; + } + return true; + } context_->set_call(call_); context_->cq_ = call_cq_; - internal::Call call(call_, server_, call_cq_, - server_->max_receive_message_size()); - if (*status && call_) { - context_->BeginCompletionOp(&call); + if (call_wrapper_.call() == nullptr) { + // Fill it since it is empty. + call_wrapper_ = internal::Call( + call_, server_, call_cq_, server_->max_receive_message_size(), nullptr); } + // just the pointers inside call are copied here - stream_->BindCall(&call); + stream_->BindCall(&call_wrapper_); + + if (*status && call_ && call_wrapper_.server_rpc_info()) { + done_intercepting_ = true; + // Set interception point for RECV INITIAL METADATA + interceptor_methods_.AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); + interceptor_methods_.SetRecvInitialMetadata(&context_->client_metadata_); + auto f = std::bind(&ServerInterface::BaseAsyncRequest:: + ContinueFinalizeResultAfterInterception, + this); + if (interceptor_methods_.RunInterceptors(f)) { + // There are no interceptors to run. Continue + } else { + // There were interceptors to be run, so + // ContinueFinalizeResultAfterInterception will be run when interceptors + // are done. + return false; + } + } + if (*status && call_) { + context_->BeginCompletionOp(&call_wrapper_, false); + } *tag = tag_; if (delete_on_finalize_) { delete this; @@ -731,11 +1003,25 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, return true; } +void ServerInterface::BaseAsyncRequest:: + ContinueFinalizeResultAfterInterception() { + context_->BeginCompletionOp(&call_wrapper_, false); + // Queue a tag which will be returned immediately + grpc_core::ExecCtx exec_ctx; + grpc_cq_begin_op(notification_cq_->cq(), this); + grpc_cq_end_op( + notification_cq_->cq(), this, GRPC_ERROR_NONE, + [](void* arg, grpc_cq_completion* completion) { delete completion; }, + nullptr, new grpc_cq_completion()); +} + ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( ServerInterface* server, ServerContext* context, internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - void* tag) - : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} + ServerCompletionQueue* notification_cq, void* tag, const char* name) + : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, + true), + name_(name) {} void ServerInterface::RegisteredAsyncRequest::IssueRequest( void* registered_method, grpc_byte_buffer** payload, @@ -751,7 +1037,7 @@ ServerInterface::GenericAsyncRequest::GenericAsyncRequest( ServerInterface* server, GenericServerContext* context, internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) - : BaseAsyncRequest(server, context, stream, call_cq, tag, + : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, delete_on_finalize) { grpc_call_details_init(&call_details_); GPR_ASSERT(notification_cq); @@ -764,6 +1050,10 @@ ServerInterface::GenericAsyncRequest::GenericAsyncRequest( bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) { + // If we are done intercepting, there is nothing more for us to do + if (done_intercepting_) { + return BaseAsyncRequest::FinalizeResult(tag, status); + } // TODO(yangg) remove the copy here. if (*status) { static_cast<GenericServerContext*>(context_)->method_ = @@ -774,16 +1064,26 @@ bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, } grpc_slice_unref(call_details_.method); grpc_slice_unref(call_details_.host); + call_wrapper_ = internal::Call( + call_, server_, call_cq_, server_->max_receive_message_size(), + context_->set_server_rpc_info( + static_cast<GenericServerContext*>(context_)->method_.c_str(), + *server_->interceptor_creators())); return BaseAsyncRequest::FinalizeResult(tag, status); } bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, bool* status) { - if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) { - new UnimplementedAsyncRequest(server_, cq_); - new UnimplementedAsyncResponse(this); + if (GenericAsyncRequest::FinalizeResult(tag, status)) { + // We either had no interceptors run or we are done intercepting + if (*status) { + new UnimplementedAsyncRequest(server_, cq_); + new UnimplementedAsyncResponse(this); + } else { + delete this; + } } else { - delete this; + // The tag was swallowed due to interception. We will see it again. } return false; } @@ -798,4 +1098,41 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( ServerInitializer* Server::initializer() { return server_initializer_.get(); } +namespace { +class ShutdownCallback : public grpc_experimental_completion_queue_functor { + public: + ShutdownCallback() { functor_run = &ShutdownCallback::Run; } + // TakeCQ takes ownership of the cq into the shutdown callback + // so that the shutdown callback will be responsible for destroying it + void TakeCQ(CompletionQueue* cq) { cq_ = cq; } + + // The Run function will get invoked by the completion queue library + // when the shutdown is actually complete + static void Run(grpc_experimental_completion_queue_functor* cb, int) { + auto* callback = static_cast<ShutdownCallback*>(cb); + delete callback->cq_; + delete callback; + } + + private: + CompletionQueue* cq_ = nullptr; +}; +} // namespace + +CompletionQueue* Server::CallbackCQ() { + // TODO(vjpai): Consider using a single global CQ for the default CQ + // if there is no explicit per-server CQ registered + std::lock_guard<std::mutex> l(mu_); + if (callback_cq_ == nullptr) { + auto* shutdown_callback = new ShutdownCallback; + callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, + shutdown_callback}); + + // Transfer ownership of the new cq to its own shutdown callback + shutdown_callback->TakeCQ(callback_cq_); + } + return callback_cq_; +}; + } // namespace grpc diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index bd532a968d..355debb3fb 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -41,13 +41,29 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { public: // initial refs: one in the server context, one in the cq // must ref the call before calling constructor and after deleting this - CompletionOp(grpc_call* call) - : call_(call), + CompletionOp(internal::Call* call) + : call_(*call), has_tag_(false), tag_(nullptr), + core_cq_tag_(this), refs_(2), finalized_(false), - cancelled_(0) {} + cancelled_(0), + done_intercepting_(false) {} + + // CompletionOp isn't copyable or movable + CompletionOp(const CompletionOp&) = delete; + CompletionOp& operator=(const CompletionOp&) = delete; + CompletionOp(CompletionOp&&) = delete; + CompletionOp& operator=(CompletionOp&&) = delete; + + ~CompletionOp() { + if (call_.server_rpc_info()) { + call_.server_rpc_info()->Unref(); + } + } + + void FillOps(internal::Call* call) override; // This should always be arena allocated in the call, so override delete. // But this class is not trivially destructible, so must actually call delete @@ -63,7 +79,6 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { // there are no tests catching the compiler warning. static void operator delete(void*, void*) { assert(0); } - void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override; bool FinalizeResult(void** tag, bool* status) override; bool CheckCancelled(CompletionQueue* cq) { @@ -77,107 +92,188 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { tag_ = tag; } - /// TODO(vjpai): Allow override of cq_tag if appropriate for callback API - void* cq_tag() override { return this; } + void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; } + + void* core_cq_tag() override { return core_cq_tag_; } void Unref(); + // This will be called while interceptors are run if the RPC is a hijacked + // RPC. This should set hijacking state for each of the ops. + void SetHijackingState() override { + /* Servers don't allow hijacking */ + GPR_CODEGEN_ASSERT(false); + } + + /* Should be called after interceptors are done running */ + void ContinueFillOpsAfterInterception() override {} + + /* Should be called after interceptors are done running on the finalize result + * path */ + void ContinueFinalizeResultAfterInterception() override { + done_intercepting_ = true; + if (!has_tag_) { + /* We don't have a tag to return. */ + std::unique_lock<std::mutex> lock(mu_); + if (--refs_ == 0) { + lock.unlock(); + grpc_call* call = call_.call(); + delete this; + grpc_call_unref(call); + } + return; + } + /* Start a dummy op so that we can return the tag */ + GPR_CODEGEN_ASSERT(GRPC_CALL_OK == + g_core_codegen_interface->grpc_call_start_batch( + call_.call(), nullptr, 0, this, nullptr)); + } + private: bool CheckCancelledNoPluck() { std::lock_guard<std::mutex> g(mu_); return finalized_ ? (cancelled_ != 0) : false; } - grpc_call* call_; + internal::Call call_; bool has_tag_; void* tag_; + void* core_cq_tag_; std::mutex mu_; int refs_; bool finalized_; int cancelled_; + bool done_intercepting_; + internal::InterceptorBatchMethodsImpl interceptor_methods_; }; void ServerContext::CompletionOp::Unref() { std::unique_lock<std::mutex> lock(mu_); if (--refs_ == 0) { lock.unlock(); - // Save aside the call pointer before deleting for later unref - grpc_call* call = call_; + grpc_call* call = call_.call(); delete this; grpc_call_unref(call); } } -void ServerContext::CompletionOp::FillOps(grpc_call* call, grpc_op* ops, - size_t* nops) { - ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER; - ops->data.recv_close_on_server.cancelled = &cancelled_; - ops->flags = 0; - ops->reserved = nullptr; - *nops = 1; +void ServerContext::CompletionOp::FillOps(internal::Call* call) { + grpc_op ops; + ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER; + ops.data.recv_close_on_server.cancelled = &cancelled_; + ops.flags = 0; + ops.reserved = nullptr; + interceptor_methods_.SetCall(&call_); + interceptor_methods_.SetReverse(); + interceptor_methods_.SetCallOpSetInterface(this); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), &ops, 1, + core_cq_tag_, nullptr)); + /* No interceptors to run here */ } bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { - std::unique_lock<std::mutex> lock(mu_); - finalized_ = true; bool ret = false; - if (has_tag_) { - *tag = tag_; - ret = true; + std::unique_lock<std::mutex> lock(mu_); + if (done_intercepting_) { + /* We are done intercepting. */ + if (has_tag_) { + *tag = tag_; + ret = true; + } + if (--refs_ == 0) { + lock.unlock(); + grpc_call* call = call_.call(); + delete this; + grpc_call_unref(call); + } + return ret; } + finalized_ = true; + if (!*status) cancelled_ = 1; - if (--refs_ == 0) { - lock.unlock(); - // Save aside the call pointer before deleting for later unref - grpc_call* call = call_; - delete this; - grpc_call_unref(call); + /* Release the lock since we are going to be running through interceptors now + */ + lock.unlock(); + /* Add interception point and run through interceptors */ + interceptor_methods_.AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_CLOSE); + if (interceptor_methods_.RunInterceptors()) { + /* No interceptors were run */ + if (has_tag_) { + *tag = tag_; + ret = true; + } + lock.lock(); + if (--refs_ == 0) { + lock.unlock(); + grpc_call* call = call_.call(); + delete this; + grpc_call_unref(call); + } + return ret; } - return ret; + /* There are interceptors to be run. Return false for now */ + return false; } // ServerContext body -ServerContext::ServerContext() - : completion_op_(nullptr), - has_notify_when_done_tag_(false), - async_notify_when_done_tag_(nullptr), - deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)), - call_(nullptr), - cq_(nullptr), - sent_initial_metadata_(false), - compression_level_set_(false), - has_pending_ops_(false) {} - -ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) - : completion_op_(nullptr), - has_notify_when_done_tag_(false), - async_notify_when_done_tag_(nullptr), - deadline_(deadline), - call_(nullptr), - cq_(nullptr), - sent_initial_metadata_(false), - compression_level_set_(false), - has_pending_ops_(false) { +ServerContext::ServerContext() { Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); } + +ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) { + Setup(deadline); + std::swap(*client_metadata_.arr(), *arr); +} + +void ServerContext::Setup(gpr_timespec deadline) { + completion_op_ = nullptr; + has_notify_when_done_tag_ = false; + async_notify_when_done_tag_ = nullptr; + deadline_ = deadline; + call_ = nullptr; + cq_ = nullptr; + sent_initial_metadata_ = false; + compression_level_set_ = false; + has_pending_ops_ = false; + rpc_info_ = nullptr; +} + +void ServerContext::BindDeadlineAndMetadata(gpr_timespec deadline, + grpc_metadata_array* arr) { + deadline_ = deadline; std::swap(*client_metadata_.arr(), *arr); } -ServerContext::~ServerContext() { +ServerContext::~ServerContext() { Clear(); } + +void ServerContext::Clear() { if (call_) { grpc_call_unref(call_); } if (completion_op_) { completion_op_->Unref(); } + if (rpc_info_) { + rpc_info_->Unref(); + } + // Don't need to clear out call_, completion_op_, or rpc_info_ because this is + // either called from destructor or just before Setup } -void ServerContext::BeginCompletionOp(internal::Call* call) { +void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) { GPR_ASSERT(!completion_op_); + if (rpc_info_) { + rpc_info_->Ref(); + } grpc_call_ref(call->call()); completion_op_ = new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) - CompletionOp(call->call()); - if (has_notify_when_done_tag_) { + CompletionOp(call); + if (callback) { + completion_tag_ = + internal::CallbackWithSuccessTag(call->call(), nullptr, completion_op_); + completion_op_->set_core_cq_tag(&completion_tag_); + } else if (has_notify_when_done_tag_) { completion_op_->set_tag(async_notify_when_done_tag_); } call->PerformOps(completion_op_); @@ -206,12 +302,15 @@ void ServerContext::TryCancel() const { } bool ServerContext::IsCancelled() const { - if (has_notify_when_done_tag_) { - // when using async API, but the result is only valid + if (completion_tag_) { + // When using callback API, this result is always valid. + return completion_op_->CheckCancelledAsync(); + } else if (has_notify_when_done_tag_) { + // When using async API, the result is only valid // if the tag has already been delivered at the completion queue return completion_op_ && completion_op_->CheckCancelledAsync(); } else { - // when using sync API + // when using sync API, the result is always valid return completion_op_ && completion_op_->CheckCancelled(cq_); } } diff --git a/src/proto/grpc/channelz/channelz.proto b/src/proto/grpc/channelz/channelz.proto index d930dfcfb4..6202e5e817 100644 --- a/src/proto/grpc/channelz/channelz.proto +++ b/src/proto/grpc/channelz/channelz.proto @@ -424,6 +424,8 @@ service Channelz { rpc GetTopChannels(GetTopChannelsRequest) returns (GetTopChannelsResponse); // Gets all servers that exist in the process. rpc GetServers(GetServersRequest) returns (GetServersResponse); + // Returns a single Server, or else a NOT_FOUND code. + rpc GetServer(GetServerRequest) returns (GetServerResponse); // Gets all server sockets that exist in the process. rpc GetServerSockets(GetServerSocketsRequest) returns (GetServerSocketsResponse); // Returns a single Channel, or else a NOT_FOUND code. @@ -466,6 +468,17 @@ message GetServersResponse { bool end = 2; } +message GetServerRequest { + // server_id is the identifier of the specific server to get. + int64 server_id = 1; +} + +message GetServerResponse { + // The Server that corresponds to the requested server_id. This field + // should be set. + Server server = 1; +} + message GetServerSocketsRequest { int64 server_id = 1; // start_socket_id indicates that only sockets at or above this id should be diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py index 0a3097111f..3cb0eb179e 100644 --- a/src/python/grpcio/commands.py +++ b/src/python/grpcio/commands.py @@ -274,8 +274,14 @@ class BuildExt(build_ext.build_ext): extra_defines = [ 'EXTRA_DEFINES="GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK=1"' ] + # Ensure the BoringSSL are built instead of using system provided + # libraries. It prevents dependency issues while distributing to + # Mac users who use MacPorts to manage their libraries. #17002 + mod_env = dict(os.environ) + mod_env['REQUIRE_CUSTOM_LIBRARIES_opt'] = '1' make_process = subprocess.Popen( ['make'] + extra_defines + targets, + env=mod_env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) make_out, make_err = make_process.communicate() diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 0e192b6201..18245e9107 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -98,6 +98,7 @@ grpc_resource_quota_set_max_threads_type grpc_resource_quota_set_max_threads_imp grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import; grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import; grpc_channelz_get_servers_type grpc_channelz_get_servers_import; +grpc_channelz_get_server_type grpc_channelz_get_server_import; grpc_channelz_get_server_sockets_type grpc_channelz_get_server_sockets_import; grpc_channelz_get_channel_type grpc_channelz_get_channel_import; grpc_channelz_get_subchannel_type grpc_channelz_get_subchannel_import; @@ -355,6 +356,7 @@ void grpc_rb_load_imports(HMODULE library) { grpc_resource_quota_arg_vtable_import = (grpc_resource_quota_arg_vtable_type) GetProcAddress(library, "grpc_resource_quota_arg_vtable"); grpc_channelz_get_top_channels_import = (grpc_channelz_get_top_channels_type) GetProcAddress(library, "grpc_channelz_get_top_channels"); grpc_channelz_get_servers_import = (grpc_channelz_get_servers_type) GetProcAddress(library, "grpc_channelz_get_servers"); + grpc_channelz_get_server_import = (grpc_channelz_get_server_type) GetProcAddress(library, "grpc_channelz_get_server"); grpc_channelz_get_server_sockets_import = (grpc_channelz_get_server_sockets_type) GetProcAddress(library, "grpc_channelz_get_server_sockets"); grpc_channelz_get_channel_import = (grpc_channelz_get_channel_type) GetProcAddress(library, "grpc_channelz_get_channel"); grpc_channelz_get_subchannel_import = (grpc_channelz_get_subchannel_type) GetProcAddress(library, "grpc_channelz_get_subchannel"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index e075db89e8..56d222c7ec 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -269,6 +269,9 @@ extern grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import typedef char*(*grpc_channelz_get_servers_type)(intptr_t start_server_id); extern grpc_channelz_get_servers_type grpc_channelz_get_servers_import; #define grpc_channelz_get_servers grpc_channelz_get_servers_import +typedef char*(*grpc_channelz_get_server_type)(intptr_t server_id); +extern grpc_channelz_get_server_type grpc_channelz_get_server_import; +#define grpc_channelz_get_server grpc_channelz_get_server_import typedef char*(*grpc_channelz_get_server_sockets_type)(intptr_t server_id, intptr_t start_socket_id); extern grpc_channelz_get_server_sockets_type grpc_channelz_get_server_sockets_import; #define grpc_channelz_get_server_sockets grpc_channelz_get_server_sockets_import |