diff options
author | hcaseyal <hcaseyal@gmail.com> | 2018-12-03 10:16:35 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-12-03 10:16:35 -0800 |
commit | cf14bd13fbbd2d0069db35de51b940d9f0a458f2 (patch) | |
tree | a6bcfc779b60b4ea436f04875bdc7658046b0080 /src | |
parent | c12aabc6a7d67e9f786125480ed6b17e25278c98 (diff) | |
parent | 1e1d4c262134c2641329a42f262c1687d8113f0e (diff) |
Merge branch 'master' into callback_test_coverage_1
Diffstat (limited to 'src')
88 files changed, 1466 insertions, 177 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 7986aca696..b004687250 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -132,6 +132,7 @@ grpc::string GetHeaderIncludes(grpc_generator::File* file, "grpcpp/impl/codegen/async_generic_service.h", "grpcpp/impl/codegen/async_stream.h", "grpcpp/impl/codegen/async_unary_call.h", + "grpcpp/impl/codegen/client_callback.h", "grpcpp/impl/codegen/method_handler_impl.h", "grpcpp/impl/codegen/proto_utils.h", "grpcpp/impl/codegen/rpc_method.h", @@ -580,11 +581,22 @@ void PrintHeaderClientMethodCallbackInterfaces( "const $Request$* request, $Response$* response, " "std::function<void(::grpc::Status)>) = 0;\n"); } else if (ClientOnlyStreaming(method)) { - // TODO(vjpai): Add support for client-side streaming + printer->Print(*vars, + "virtual void $Method$(::grpc::ClientContext* context, " + "$Response$* response, " + "::grpc::experimental::ClientWriteReactor< $Request$>* " + "reactor) = 0;\n"); } else if (ServerOnlyStreaming(method)) { - // TODO(vjpai): Add support for server-side streaming + printer->Print(*vars, + "virtual void $Method$(::grpc::ClientContext* context, " + "$Request$* request, " + "::grpc::experimental::ClientReadReactor< $Response$>* " + "reactor) = 0;\n"); } else if (method->BidiStreaming()) { - // TODO(vjpai): Add support for bidi streaming + printer->Print(*vars, + "virtual void $Method$(::grpc::ClientContext* context, " + "::grpc::experimental::ClientBidiReactor< " + "$Request$,$Response$>* reactor) = 0;\n"); } } @@ -631,11 +643,23 @@ void PrintHeaderClientMethodCallback(grpc_generator::Printer* printer, "const $Request$* request, $Response$* response, " "std::function<void(::grpc::Status)>) override;\n"); } else if (ClientOnlyStreaming(method)) { - // TODO(vjpai): Add support for client-side streaming + printer->Print(*vars, + "void $Method$(::grpc::ClientContext* context, " + "$Response$* response, " + "::grpc::experimental::ClientWriteReactor< $Request$>* " + "reactor) override;\n"); } else if (ServerOnlyStreaming(method)) { - // TODO(vjpai): Add support for server-side streaming + printer->Print(*vars, + "void $Method$(::grpc::ClientContext* context, " + "$Request$* request, " + "::grpc::experimental::ClientReadReactor< $Response$>* " + "reactor) override;\n"); + } else if (method->BidiStreaming()) { - // TODO(vjpai): Add support for bidi streaming + printer->Print(*vars, + "void $Method$(::grpc::ClientContext* context, " + "::grpc::experimental::ClientBidiReactor< " + "$Request$,$Response$>* reactor) override;\n"); } } @@ -865,6 +889,11 @@ void PrintHeaderServerCallbackMethodsHelper( " abort();\n" " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" "}\n"); + printer->Print(*vars, + "virtual ::grpc::experimental::ServerReadReactor< " + "$RealRequest$, $RealResponse$>* $Method$() {\n" + " return new ::grpc::internal::UnimplementedReadReactor<\n" + " $RealRequest$, $RealResponse$>;}\n"); } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, @@ -876,6 +905,11 @@ void PrintHeaderServerCallbackMethodsHelper( " abort();\n" " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" "}\n"); + printer->Print(*vars, + "virtual ::grpc::experimental::ServerWriteReactor< " + "$RealRequest$, $RealResponse$>* $Method$() {\n" + " return new ::grpc::internal::UnimplementedWriteReactor<\n" + " $RealRequest$, $RealResponse$>;}\n"); } else if (method->BidiStreaming()) { printer->Print( *vars, @@ -887,6 +921,11 @@ void PrintHeaderServerCallbackMethodsHelper( " abort();\n" " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" "}\n"); + printer->Print(*vars, + "virtual ::grpc::experimental::ServerBidiReactor< " + "$RealRequest$, $RealResponse$>* $Method$() {\n" + " return new ::grpc::internal::UnimplementedBidiReactor<\n" + " $RealRequest$, $RealResponse$>;}\n"); } } @@ -915,22 +954,36 @@ void PrintHeaderServerMethodCallback( *vars, " ::grpc::Service::experimental().MarkMethodCallback($Idx$,\n" " new ::grpc::internal::CallbackUnaryHandler< " - "ExperimentalWithCallbackMethod_$Method$<BaseClass>, $RealRequest$, " - "$RealResponse$>(\n" + "$RealRequest$, $RealResponse$>(\n" " [this](::grpc::ServerContext* context,\n" " const $RealRequest$* request,\n" " $RealResponse$* response,\n" " ::grpc::experimental::ServerCallbackRpcController* " "controller) {\n" - " this->$" + " return this->$" "Method$(context, request, response, controller);\n" - " }, this));\n"); + " }));\n"); } else if (ClientOnlyStreaming(method)) { - // TODO(vjpai): Add in code generation for all streaming methods + printer->Print( + *vars, + " ::grpc::Service::experimental().MarkMethodCallback($Idx$,\n" + " new ::grpc::internal::CallbackClientStreamingHandler< " + "$RealRequest$, $RealResponse$>(\n" + " [this] { return this->$Method$(); }));\n"); } else if (ServerOnlyStreaming(method)) { - // TODO(vjpai): Add in code generation for all streaming methods + printer->Print( + *vars, + " ::grpc::Service::experimental().MarkMethodCallback($Idx$,\n" + " new ::grpc::internal::CallbackServerStreamingHandler< " + "$RealRequest$, $RealResponse$>(\n" + " [this] { return this->$Method$(); }));\n"); } else if (method->BidiStreaming()) { - // TODO(vjpai): Add in code generation for all streaming methods + printer->Print( + *vars, + " ::grpc::Service::experimental().MarkMethodCallback($Idx$,\n" + " new ::grpc::internal::CallbackBidiHandler< " + "$RealRequest$, $RealResponse$>(\n" + " [this] { return this->$Method$(); }));\n"); } printer->Print(*vars, "}\n"); printer->Print(*vars, @@ -967,8 +1020,7 @@ void PrintHeaderServerMethodRawCallback( *vars, " ::grpc::Service::experimental().MarkMethodRawCallback($Idx$,\n" " new ::grpc::internal::CallbackUnaryHandler< " - "ExperimentalWithRawCallbackMethod_$Method$<BaseClass>, $RealRequest$, " - "$RealResponse$>(\n" + "$RealRequest$, $RealResponse$>(\n" " [this](::grpc::ServerContext* context,\n" " const $RealRequest$* request,\n" " $RealResponse$* response,\n" @@ -976,13 +1028,28 @@ void PrintHeaderServerMethodRawCallback( "controller) {\n" " this->$" "Method$(context, request, response, controller);\n" - " }, this));\n"); + " }));\n"); } else if (ClientOnlyStreaming(method)) { - // TODO(vjpai): Add in code generation for all streaming methods + printer->Print( + *vars, + " ::grpc::Service::experimental().MarkMethodRawCallback($Idx$,\n" + " new ::grpc::internal::CallbackClientStreamingHandler< " + "$RealRequest$, $RealResponse$>(\n" + " [this] { return this->$Method$(); }));\n"); } else if (ServerOnlyStreaming(method)) { - // TODO(vjpai): Add in code generation for all streaming methods + printer->Print( + *vars, + " ::grpc::Service::experimental().MarkMethodRawCallback($Idx$,\n" + " new ::grpc::internal::CallbackServerStreamingHandler< " + "$RealRequest$, $RealResponse$>(\n" + " [this] { return this->$Method$(); }));\n"); } else if (method->BidiStreaming()) { - // TODO(vjpai): Add in code generation for all streaming methods + printer->Print( + *vars, + " ::grpc::Service::experimental().MarkMethodRawCallback($Idx$,\n" + " new ::grpc::internal::CallbackBidiHandler< " + "$RealRequest$, $RealResponse$>(\n" + " [this] { return this->$Method$(); }));\n"); } printer->Print(*vars, "}\n"); printer->Print(*vars, @@ -1607,7 +1674,19 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer, "context, response);\n" "}\n\n"); - // TODO(vjpai): Add callback version + printer->Print( + *vars, + "void $ns$$Service$::" + "Stub::experimental_async::$Method$(::grpc::ClientContext* context, " + "$Response$* response, " + "::grpc::experimental::ClientWriteReactor< $Request$>* reactor) {\n"); + printer->Print(*vars, + " ::grpc::internal::ClientCallbackWriterFactory< " + "$Request$>::Create(" + "stub_->channel_.get(), " + "stub_->rpcmethod_$Method$_, " + "context, response, reactor);\n" + "}\n\n"); for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; @@ -1641,7 +1720,19 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer, "context, request);\n" "}\n\n"); - // TODO(vjpai): Add callback version + printer->Print( + *vars, + "void $ns$$Service$::Stub::experimental_async::$Method$(::grpc::" + "ClientContext* context, " + "$Request$* request, " + "::grpc::experimental::ClientReadReactor< $Response$>* reactor) {\n"); + printer->Print(*vars, + " ::grpc::internal::ClientCallbackReaderFactory< " + "$Response$>::Create(" + "stub_->channel_.get(), " + "stub_->rpcmethod_$Method$_, " + "context, request, reactor);\n" + "}\n\n"); for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; @@ -1675,7 +1766,19 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer, "context);\n" "}\n\n"); - // TODO(vjpai): Add callback version + printer->Print( + *vars, + "void $ns$$Service$::Stub::experimental_async::$Method$(::grpc::" + "ClientContext* context, " + "::grpc::experimental::ClientBidiReactor< $Request$,$Response$>* " + "reactor) {\n"); + printer->Print(*vars, + " ::grpc::internal::ClientCallbackReaderWriterFactory< " + "$Request$,$Response$>::Create(" + "stub_->channel_.get(), " + "stub_->rpcmethod_$Method$_, " + "context, reactor);\n" + "}\n\n"); for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index dc0e1f89ce..7e70f1c28b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -749,7 +749,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( void* arg, grpc_error* error) { BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); GrpcLb* grpclb_policy = lb_calld->grpclb_policy(); - // Empty payload means the LB call was cancelled. + // Null payload means the LB call was cancelled. if (lb_calld != grpclb_policy->lb_calld_.get() || lb_calld->recv_message_payload_ == nullptr) { lb_calld->Unref(DEBUG_LOCATION, "on_message_received"); @@ -803,54 +803,45 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( gpr_free(ipport); } } - /* update serverlist */ - if (serverlist->num_servers > 0) { - // Start sending client load report only after we start using the - // serverlist returned from the current LB call. - if (lb_calld->client_stats_report_interval_ > 0 && - lb_calld->client_stats_ == nullptr) { - lb_calld->client_stats_.reset(New<GrpcLbClientStats>()); - // TODO(roth): We currently track this ref manually. Once the - // ClosureRef API is ready, we should pass the RefCountedPtr<> along - // with the callback. - auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report"); - self.release(); - lb_calld->ScheduleNextClientLoadReportLocked(); - } - if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_, - serverlist)) { - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Incoming server list identical to current, " - "ignoring.", - grpclb_policy); - } - grpc_grpclb_destroy_serverlist(serverlist); - } else { /* new serverlist */ - if (grpclb_policy->serverlist_ != nullptr) { - /* dispose of the old serverlist */ - grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_); - } else { - /* or dispose of the fallback */ - grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_); - grpclb_policy->fallback_backend_addresses_ = nullptr; - if (grpclb_policy->fallback_timer_callback_pending_) { - grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_); - } - } - // and update the copy in the GrpcLb instance. This - // serverlist instance will be destroyed either upon the next - // update or when the GrpcLb instance is destroyed. - grpclb_policy->serverlist_ = serverlist; - grpclb_policy->serverlist_index_ = 0; - grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked(); - } - } else { + // Start sending client load report only after we start using the + // serverlist returned from the current LB call. + if (lb_calld->client_stats_report_interval_ > 0 && + lb_calld->client_stats_ == nullptr) { + lb_calld->client_stats_.reset(New<GrpcLbClientStats>()); + // TODO(roth): We currently track this ref manually. Once the + // ClosureRef API is ready, we should pass the RefCountedPtr<> along + // with the callback. + auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report"); + self.release(); + lb_calld->ScheduleNextClientLoadReportLocked(); + } + // Check if the serverlist differs from the previous one. + if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_, serverlist)) { if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.", + gpr_log(GPR_INFO, + "[grpclb %p] Incoming server list identical to current, " + "ignoring.", grpclb_policy); } grpc_grpclb_destroy_serverlist(serverlist); + } else { // New serverlist. + if (grpclb_policy->serverlist_ != nullptr) { + // Dispose of the old serverlist. + grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_); + } else { + // Dispose of the fallback. + grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_); + grpclb_policy->fallback_backend_addresses_ = nullptr; + if (grpclb_policy->fallback_timer_callback_pending_) { + grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_); + } + } + // Update the serverlist in the GrpcLb instance. This serverlist + // instance will be destroyed either upon the next update or when the + // GrpcLb instance is destroyed. + grpclb_policy->serverlist_ = serverlist; + grpclb_policy->serverlist_index_ = 0; + grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked(); } } else { // No valid initial response or serverlist found. @@ -1583,7 +1574,7 @@ void GrpcLb::AddPendingPick(PendingPick* pp) { bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, grpc_error** error) { // Check for drops if we are not using fallback backend addresses. - if (serverlist_ != nullptr) { + if (serverlist_ != nullptr && serverlist_->num_servers > 0) { // Look at the index into the serverlist to see if we should drop this call. grpc_grpclb_server* server = serverlist_->servers[serverlist_index_++]; if (serverlist_index_ == serverlist_->num_servers) { @@ -1681,7 +1672,6 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { grpc_lb_addresses* addresses; bool is_backend_from_grpclb_load_balancer = false; if (serverlist_ != nullptr) { - GPR_ASSERT(serverlist_->num_servers > 0); addresses = ProcessServerlist(serverlist_); is_backend_from_grpclb_load_balancer = true; } else { diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 90bc88961d..4ebc2c8161 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -122,6 +122,8 @@ class AresDnsResolver : public Resolver { char* service_config_json_ = nullptr; // has shutdown been initiated bool shutdown_initiated_ = false; + // timeout in milliseconds for active DNS queries + int query_timeout_ms_; }; AresDnsResolver::AresDnsResolver(const ResolverArgs& args) @@ -159,6 +161,11 @@ AresDnsResolver::AresDnsResolver(const ResolverArgs& args) grpc_combiner_scheduler(combiner())); GRPC_CLOSURE_INIT(&on_resolved_, OnResolvedLocked, this, grpc_combiner_scheduler(combiner())); + const grpc_arg* query_timeout_ms_arg = + grpc_channel_args_find(channel_args_, GRPC_ARG_DNS_ARES_QUERY_TIMEOUT_MS); + query_timeout_ms_ = grpc_channel_arg_get_integer( + query_timeout_ms_arg, + {GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS, 0, INT_MAX}); } AresDnsResolver::~AresDnsResolver() { @@ -410,7 +417,8 @@ void AresDnsResolver::StartResolvingLocked() { pending_request_ = grpc_dns_lookup_ares_locked( dns_server_, name_to_resolve_, kDefaultPort, interested_parties_, &on_resolved_, &lb_addresses_, true /* check_grpclb */, - request_service_config_ ? &service_config_json_ : nullptr, combiner()); + request_service_config_ ? &service_config_json_ : nullptr, + query_timeout_ms_, combiner()); last_resolution_timestamp_ = grpc_core::ExecCtx::Get()->Now(); } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc index fdbd07ebf5..f42b1e309d 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc @@ -33,6 +33,7 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/timer.h" typedef struct fd_node { /** the owner of this fd node */ @@ -76,6 +77,12 @@ struct grpc_ares_ev_driver { grpc_ares_request* request; /** Owned by the ev_driver. Creates new GrpcPolledFd's */ grpc_core::UniquePtr<grpc_core::GrpcPolledFdFactory> polled_fd_factory; + /** query timeout in milliseconds */ + int query_timeout_ms; + /** alarm to cancel active queries */ + grpc_timer query_timeout; + /** cancels queries on a timeout */ + grpc_closure on_timeout_locked; }; static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver); @@ -116,8 +123,11 @@ static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) { } } +static void on_timeout_locked(void* arg, grpc_error* error); + grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set, + int query_timeout_ms, grpc_combiner* combiner, grpc_ares_request* request) { *ev_driver = grpc_core::New<grpc_ares_ev_driver>(); @@ -146,6 +156,9 @@ grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, grpc_core::NewGrpcPolledFdFactory((*ev_driver)->combiner); (*ev_driver) ->polled_fd_factory->ConfigureAresChannelLocked((*ev_driver)->channel); + GRPC_CLOSURE_INIT(&(*ev_driver)->on_timeout_locked, on_timeout_locked, + *ev_driver, grpc_combiner_scheduler(combiner)); + (*ev_driver)->query_timeout_ms = query_timeout_ms; return GRPC_ERROR_NONE; } @@ -155,6 +168,7 @@ void grpc_ares_ev_driver_on_queries_complete_locked( // is working, grpc_ares_notify_on_event_locked will shut down the // fds; if it's not working, there are no fds to shut down. ev_driver->shutting_down = true; + grpc_timer_cancel(&ev_driver->query_timeout); grpc_ares_ev_driver_unref(ev_driver); } @@ -185,6 +199,17 @@ static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) { return nullptr; } +static void on_timeout_locked(void* arg, grpc_error* error) { + grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg); + GRPC_CARES_TRACE_LOG( + "ev_driver=%p on_timeout_locked. driver->shutting_down=%d. err=%s", + driver, driver->shutting_down, grpc_error_string(error)); + if (!driver->shutting_down && error == GRPC_ERROR_NONE) { + grpc_ares_ev_driver_shutdown_locked(driver); + } + grpc_ares_ev_driver_unref(driver); +} + static void on_readable_locked(void* arg, grpc_error* error) { fd_node* fdn = static_cast<fd_node*>(arg); grpc_ares_ev_driver* ev_driver = fdn->ev_driver; @@ -314,6 +339,17 @@ void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) { if (!ev_driver->working) { ev_driver->working = true; grpc_ares_notify_on_event_locked(ev_driver); + grpc_millis timeout = + ev_driver->query_timeout_ms == 0 + ? GRPC_MILLIS_INF_FUTURE + : ev_driver->query_timeout_ms + grpc_core::ExecCtx::Get()->Now(); + GRPC_CARES_TRACE_LOG( + "ev_driver=%p grpc_ares_ev_driver_start_locked. timeout in %" PRId64 + " ms", + ev_driver, timeout); + grpc_ares_ev_driver_ref(ev_driver); + grpc_timer_init(&ev_driver->query_timeout, timeout, + &ev_driver->on_timeout_locked); } } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h index 671c537fe7..b8cefd9470 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -43,6 +43,7 @@ ares_channel* grpc_ares_ev_driver_get_channel_locked( created successfully. */ grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set, + int query_timeout_ms, grpc_combiner* combiner, grpc_ares_request* request); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 582e2203fc..55715869b6 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -359,7 +359,7 @@ done: void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( grpc_ares_request* r, const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, - bool check_grpclb, grpc_combiner* combiner) { + bool check_grpclb, int query_timeout_ms, grpc_combiner* combiner) { grpc_error* error = GRPC_ERROR_NONE; grpc_ares_hostbyname_request* hr = nullptr; ares_channel* channel = nullptr; @@ -388,7 +388,7 @@ void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( port = gpr_strdup(default_port); } error = grpc_ares_ev_driver_create_locked(&r->ev_driver, interested_parties, - combiner, r); + query_timeout_ms, combiner, r); if (error != GRPC_ERROR_NONE) goto error_cleanup; channel = grpc_ares_ev_driver_get_channel_locked(r->ev_driver); // If dns_server is specified, use it. @@ -522,7 +522,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, - grpc_combiner* combiner) { + int query_timeout_ms, grpc_combiner* combiner) { grpc_ares_request* r = static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request))); r->ev_driver = nullptr; @@ -546,7 +546,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( // Look up name using c-ares lib. grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( r, dns_server, name, default_port, interested_parties, check_grpclb, - combiner); + query_timeout_ms, combiner); return r; } @@ -554,6 +554,7 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, + int query_timeout_ms, grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl; static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) { @@ -648,7 +649,8 @@ static void grpc_resolve_address_invoke_dns_lookup_ares_locked( r->ares_request = grpc_dns_lookup_ares_locked( nullptr /* dns_server */, r->name, r->default_port, r->interested_parties, &r->on_dns_lookup_done_locked, &r->lb_addrs, false /* check_grpclb */, - nullptr /* service_config_json */, r->combiner); + nullptr /* service_config_json */, GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS, + r->combiner); } static void grpc_resolve_address_ares_impl(const char* name, diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index a1231cc4e0..9acef1d0ca 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -26,6 +26,8 @@ #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/resolve_address.h" +#define GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS 10000 + extern grpc_core::TraceFlag grpc_trace_cares_address_sorting; extern grpc_core::TraceFlag grpc_trace_cares_resolver; @@ -60,7 +62,7 @@ extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_lb_addresses** addresses, bool check_grpclb, - char** service_config_json, grpc_combiner* combiner); + char** service_config_json, int query_timeout_ms, grpc_combiner* combiner); /* Cancel the pending grpc_ares_request \a request */ extern void (*grpc_cancel_ares_request_locked)(grpc_ares_request* request); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc index 9f293c1ac0..fc78b18304 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc @@ -30,7 +30,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, - grpc_combiner* combiner) { + int query_timeout_ms, grpc_combiner* combiner) { return NULL; } @@ -38,6 +38,7 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, + int query_timeout_ms, grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl; static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {} diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index b1b8c0083b..99c675f503 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1390,6 +1390,7 @@ static void perform_stream_op_locked(void* stream_op, GRPC_STATS_INC_HTTP2_OP_BATCHES(); s->context = op->payload->context; + s->traced = op->is_traced; if (grpc_http_trace.enabled()) { char* str = grpc_transport_stream_op_batch_string(op); gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p", str, diff --git a/src/core/ext/transport/chttp2/transport/context_list.cc b/src/core/ext/transport/chttp2/transport/context_list.cc index 4acd0c9583..f30d41c332 100644 --- a/src/core/ext/transport/chttp2/transport/context_list.cc +++ b/src/core/ext/transport/chttp2/transport/context_list.cc @@ -32,6 +32,7 @@ void ContextList::Execute(void* arg, grpc_core::Timestamps* ts, while (head != nullptr) { if (error == GRPC_ERROR_NONE && ts != nullptr) { if (write_timestamps_callback_g) { + ts->byte_offset = static_cast<uint32_t>(head->byte_offset_); write_timestamps_callback_g(head->s_->context, ts); } } diff --git a/src/core/ext/transport/chttp2/transport/context_list.h b/src/core/ext/transport/chttp2/transport/context_list.h index 68d11e94d8..d870107749 100644 --- a/src/core/ext/transport/chttp2/transport/context_list.h +++ b/src/core/ext/transport/chttp2/transport/context_list.h @@ -50,6 +50,7 @@ class ContextList { /* Create a new element in the list and add it at the front */ ContextList* elem = grpc_core::New<ContextList>(); elem->s_ = s; + elem->byte_offset_ = s->byte_counter; elem->next_ = *head; *head = elem; } @@ -61,6 +62,7 @@ class ContextList { private: grpc_chttp2_stream* s_ = nullptr; ContextList* next_ = nullptr; + size_t byte_offset_ = 0; }; void grpc_http2_set_write_timestamps_callback( diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 1c79cf6353..6aa68f5d4a 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -642,10 +642,12 @@ struct grpc_chttp2_stream { /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed */ bool unprocessed_incoming_frames_decompressed = false; - /** gRPC header bytes that are already decompressed */ - size_t decompressed_header_bytes = 0; /** Whether the bytes needs to be traced using Fathom */ bool traced = false; + /** gRPC header bytes that are already decompressed */ + size_t decompressed_header_bytes = 0; + /** Byte counter for number of bytes written */ + size_t byte_counter = 0; }; /** Transport writing call flow: diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 77320b496f..265d3365d3 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -363,6 +363,7 @@ class DataSendContext { grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes, is_last_frame_, &s_->stats.outgoing, &t_->outbuf); s_->flow_control->SentData(send_bytes); + s_->byte_counter += send_bytes; if (s_->compressed_data_buffer.length == 0) { s_->sending_bytes += s_->uncompressed_data_size; } @@ -488,9 +489,6 @@ class StreamWriteContext { return; // early out: nothing to do } - if (s_->traced && grpc_endpoint_can_track_err(t_->ep)) { - grpc_core::ContextList::Append(&t_->cl, s_); - } while ((s_->flow_controlled_buffer.length > 0 || s_->compressed_data_buffer.length > 0) && data_send_context.max_outgoing() > 0) { @@ -500,6 +498,9 @@ class StreamWriteContext { data_send_context.CompressMoreBytes(); } } + if (s_->traced && grpc_endpoint_can_track_err(t_->ep)) { + grpc_core::ContextList::Append(&t_->cl, s_); + } write_context_->ResetPingClock(); if (data_send_context.is_last_frame()) { SentLastFrame(); diff --git a/src/core/lib/gprpp/inlined_vector.h b/src/core/lib/gprpp/inlined_vector.h index 65c2b9634f..66dc751a56 100644 --- a/src/core/lib/gprpp/inlined_vector.h +++ b/src/core/lib/gprpp/inlined_vector.h @@ -100,10 +100,7 @@ class InlinedVector { void reserve(size_t capacity) { if (capacity > capacity_) { T* new_dynamic = static_cast<T*>(gpr_malloc(sizeof(T) * capacity)); - for (size_t i = 0; i < size_; ++i) { - new (&new_dynamic[i]) T(std::move(data()[i])); - data()[i].~T(); - } + move_elements(data(), new_dynamic, size_); gpr_free(dynamic_); dynamic_ = new_dynamic; capacity_ = capacity; @@ -131,13 +128,25 @@ class InlinedVector { size_--; } + size_t size() const { return size_; } + bool empty() const { return size_ == 0; } + + size_t capacity() const { return capacity_; } + + void clear() { + destroy_elements(); + init_data(); + } + + private: void copy_from(const InlinedVector& v) { - // if v is allocated, copy over the buffer. + // if v is allocated, make sure we have enough capacity. if (v.dynamic_ != nullptr) { reserve(v.capacity_); - memcpy(dynamic_, v.dynamic_, v.size_ * sizeof(T)); - } else { - memcpy(inline_, v.inline_, v.size_ * sizeof(T)); + } + // copy over elements + for (size_t i = 0; i < v.size_; ++i) { + new (&(data()[i])) T(v[i]); } // copy over metadata size_ = v.size_; @@ -145,11 +154,12 @@ class InlinedVector { } void move_from(InlinedVector& v) { - // if v is allocated, then we steal its buffer, else we copy it. + // if v is allocated, then we steal its dynamic array; otherwise, we + // move the elements individually. if (v.dynamic_ != nullptr) { dynamic_ = v.dynamic_; } else { - memcpy(inline_, v.inline_, v.size_ * sizeof(T)); + move_elements(v.data(), data(), v.size_); } // copy over metadata size_ = v.size_; @@ -158,17 +168,13 @@ class InlinedVector { v.init_data(); } - size_t size() const { return size_; } - bool empty() const { return size_ == 0; } - - size_t capacity() const { return capacity_; } - - void clear() { - destroy_elements(); - init_data(); + static void move_elements(T* src, T* dst, size_t num_elements) { + for (size_t i = 0; i < num_elements; ++i) { + new (&dst[i]) T(std::move(src[i])); + src[i].~T(); + } } - private: void init_data() { dynamic_ = nullptr; size_ = 0; diff --git a/src/core/lib/iomgr/buffer_list.cc b/src/core/lib/iomgr/buffer_list.cc index e20dab15b1..ace17a108d 100644 --- a/src/core/lib/iomgr/buffer_list.cc +++ b/src/core/lib/iomgr/buffer_list.cc @@ -35,6 +35,9 @@ void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no, TracedBuffer* new_elem = New<TracedBuffer>(seq_no, arg); /* Store the current time as the sendmsg time. */ new_elem->ts_.sendmsg_time = gpr_now(GPR_CLOCK_REALTIME); + new_elem->ts_.scheduled_time = gpr_inf_past(GPR_CLOCK_REALTIME); + new_elem->ts_.sent_time = gpr_inf_past(GPR_CLOCK_REALTIME); + new_elem->ts_.acked_time = gpr_inf_past(GPR_CLOCK_REALTIME); if (*head == nullptr) { *head = new_elem; return; diff --git a/src/core/lib/iomgr/buffer_list.h b/src/core/lib/iomgr/buffer_list.h index 87d74f9ce2..627f1bde99 100644 --- a/src/core/lib/iomgr/buffer_list.h +++ b/src/core/lib/iomgr/buffer_list.h @@ -37,6 +37,8 @@ struct Timestamps { gpr_timespec scheduled_time; gpr_timespec sent_time; gpr_timespec acked_time; + + uint32_t byte_offset; /* byte offset relative to the start of the RPC */ }; /** TracedBuffer is a class to keep track of timestamps for a specific buffer in @@ -73,7 +75,7 @@ class TracedBuffer { private: GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW - TracedBuffer(int seq_no, void* arg) + TracedBuffer(uint32_t seq_no, void* arg) : seq_no_(seq_no), arg_(arg), next_(nullptr) {} uint32_t seq_no_; /* The sequence number for the last byte in the buffer */ diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 38571b1957..4b8c891e9b 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -1242,6 +1242,8 @@ static void pollset_set_del_pollset_set(grpc_pollset_set* bag, * Event engine binding */ +static void shutdown_background_closure(void) {} + static void shutdown_engine(void) { fd_global_shutdown(); pollset_global_shutdown(); @@ -1255,6 +1257,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), true, + false, fd_create, fd_wrapped_fd, @@ -1284,6 +1287,7 @@ static const grpc_event_engine_vtable vtable = { pollset_set_add_fd, pollset_set_del_fd, + shutdown_background_closure, shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 06a382c556..7a4870db78 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -1604,6 +1604,8 @@ static void pollset_set_del_pollset_set(grpc_pollset_set* bag, * Event engine binding */ +static void shutdown_background_closure(void) {} + static void shutdown_engine(void) { fd_global_shutdown(); pollset_global_shutdown(); @@ -1612,6 +1614,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), true, + false, fd_create, fd_wrapped_fd, @@ -1641,6 +1644,7 @@ static const grpc_event_engine_vtable vtable = { pollset_set_add_fd, pollset_set_del_fd, + shutdown_background_closure, shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 16562538a6..67cbfbbd02 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -1782,6 +1782,8 @@ static void global_cv_fd_table_shutdown() { * event engine binding */ +static void shutdown_background_closure(void) {} + static void shutdown_engine(void) { pollset_global_shutdown(); if (grpc_cv_wakeup_fds_enabled()) { @@ -1796,6 +1798,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), false, + false, fd_create, fd_wrapped_fd, @@ -1825,6 +1828,7 @@ static const grpc_event_engine_vtable vtable = { pollset_set_add_fd, pollset_set_del_fd, + shutdown_background_closure, shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 6f6d134ced..32d1b6c43e 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -244,6 +244,10 @@ bool grpc_event_engine_can_track_errors(void) { return false; } +bool grpc_event_engine_run_in_background(void) { + return g_event_engine->run_in_background; +} + grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) { GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); @@ -395,4 +399,8 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) { g_event_engine->pollset_set_del_fd(pollset_set, fd); } +void grpc_shutdown_background_closure(void) { + g_event_engine->shutdown_background_closure(); +} + #endif // GRPC_POSIX_SOCKET_EV diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index b8fb8f534b..812c7a0f0f 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -42,6 +42,7 @@ typedef struct grpc_fd grpc_fd; typedef struct grpc_event_engine_vtable { size_t pollset_size; bool can_track_err; + bool run_in_background; grpc_fd* (*fd_create)(int fd, const char* name, bool track_err); int (*fd_wrapped_fd)(grpc_fd* fd); @@ -79,6 +80,7 @@ typedef struct grpc_event_engine_vtable { void (*pollset_set_add_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd); void (*pollset_set_del_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd); + void (*shutdown_background_closure)(void); void (*shutdown_engine)(void); } grpc_event_engine_vtable; @@ -101,6 +103,11 @@ const char* grpc_get_poll_strategy_name(); */ bool grpc_event_engine_can_track_errors(); +/* Returns true if polling engine runs in the background, false otherwise. + * Currently only 'epollbg' runs in the background. + */ +bool grpc_event_engine_run_in_background(); + /* Create a wrapped file descriptor. Requires fd is a non-blocking file descriptor. \a track_err if true means that error events would be tracked separately @@ -174,6 +181,9 @@ void grpc_pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd); void grpc_pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd); void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd); +/* Shut down all the closures registered in the background poller. */ +void grpc_shutdown_background_closure(); + /* override to allow tests to hook poll() usage */ typedef int (*grpc_poll_function_type)(struct pollfd*, nfds_t, int); extern grpc_poll_function_type grpc_poll_function; diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc index e957bad73d..05ecd2a49b 100644 --- a/src/core/lib/iomgr/fork_posix.cc +++ b/src/core/lib/iomgr/fork_posix.cc @@ -60,7 +60,7 @@ void grpc_prefork() { } if (strcmp(grpc_get_poll_strategy_name(), "epoll1") != 0 && strcmp(grpc_get_poll_strategy_name(), "poll") != 0) { - gpr_log(GPR_ERROR, + gpr_log(GPR_INFO, "Fork support is only compatible with the epoll1 and poll polling " "strategies"); } diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index fd09a6863b..eb29973514 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -157,6 +157,10 @@ void grpc_iomgr_shutdown() { gpr_cv_destroy(&g_rcv); } +void grpc_iomgr_shutdown_background_closure() { + grpc_iomgr_platform_shutdown_background_closure(); +} + void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name) { obj->name = gpr_strdup(name); gpr_mu_lock(&g_mu); diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h index 537ef8a6ff..8ea9289e06 100644 --- a/src/core/lib/iomgr/iomgr.h +++ b/src/core/lib/iomgr/iomgr.h @@ -35,6 +35,10 @@ void grpc_iomgr_start(); * exec_ctx. */ void grpc_iomgr_shutdown(); +/** Signals the intention to shutdown all the closures registered in the + * background poller. */ +void grpc_iomgr_shutdown_background_closure(); + /* Exposed only for testing */ size_t grpc_iomgr_count_objects_for_testing(); diff --git a/src/core/lib/iomgr/iomgr_custom.cc b/src/core/lib/iomgr/iomgr_custom.cc index d34c8e7cd1..4b112c9097 100644 --- a/src/core/lib/iomgr/iomgr_custom.cc +++ b/src/core/lib/iomgr/iomgr_custom.cc @@ -40,9 +40,11 @@ static void iomgr_platform_init(void) { } static void iomgr_platform_flush(void) {} static void iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); } +static void iomgr_platform_shutdown_background_closure(void) {} static grpc_iomgr_platform_vtable vtable = { - iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown}; + iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, + iomgr_platform_shutdown_background_closure}; void grpc_custom_iomgr_init(grpc_socket_vtable* socket, grpc_custom_resolver_vtable* resolver, diff --git a/src/core/lib/iomgr/iomgr_internal.cc b/src/core/lib/iomgr/iomgr_internal.cc index 32dbabb79d..b6c9211865 100644 --- a/src/core/lib/iomgr/iomgr_internal.cc +++ b/src/core/lib/iomgr/iomgr_internal.cc @@ -41,3 +41,7 @@ void grpc_iomgr_platform_init() { iomgr_platform_vtable->init(); } void grpc_iomgr_platform_flush() { iomgr_platform_vtable->flush(); } void grpc_iomgr_platform_shutdown() { iomgr_platform_vtable->shutdown(); } + +void grpc_iomgr_platform_shutdown_background_closure() { + iomgr_platform_vtable->shutdown_background_closure(); +} diff --git a/src/core/lib/iomgr/iomgr_internal.h b/src/core/lib/iomgr/iomgr_internal.h index b011d9c7b1..bca7409907 100644 --- a/src/core/lib/iomgr/iomgr_internal.h +++ b/src/core/lib/iomgr/iomgr_internal.h @@ -35,6 +35,7 @@ typedef struct grpc_iomgr_platform_vtable { void (*init)(void); void (*flush)(void); void (*shutdown)(void); + void (*shutdown_background_closure)(void); } grpc_iomgr_platform_vtable; void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name); @@ -52,6 +53,9 @@ void grpc_iomgr_platform_flush(void); /** tear down all platform specific global iomgr structures */ void grpc_iomgr_platform_shutdown(void); +/** shut down all the closures registered in the background poller */ +void grpc_iomgr_platform_shutdown_background_closure(void); + bool grpc_iomgr_abort_on_leaks(void); #endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */ diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc index ca7334c9a4..9386adf060 100644 --- a/src/core/lib/iomgr/iomgr_posix.cc +++ b/src/core/lib/iomgr/iomgr_posix.cc @@ -51,8 +51,13 @@ static void iomgr_platform_shutdown(void) { grpc_wakeup_fd_global_destroy(); } +static void iomgr_platform_shutdown_background_closure(void) { + grpc_shutdown_background_closure(); +} + static grpc_iomgr_platform_vtable vtable = { - iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown}; + iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, + iomgr_platform_shutdown_background_closure}; void grpc_set_default_iomgr_platform() { grpc_set_tcp_client_impl(&grpc_posix_tcp_client_vtable); diff --git a/src/core/lib/iomgr/iomgr_posix_cfstream.cc b/src/core/lib/iomgr/iomgr_posix_cfstream.cc index 235a9e0712..552ef4309c 100644 --- a/src/core/lib/iomgr/iomgr_posix_cfstream.cc +++ b/src/core/lib/iomgr/iomgr_posix_cfstream.cc @@ -54,8 +54,13 @@ static void iomgr_platform_shutdown(void) { grpc_wakeup_fd_global_destroy(); } +static void iomgr_platform_shutdown_background_closure(void) { + grpc_shutdown_background_closure(); +} + static grpc_iomgr_platform_vtable vtable = { - iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown}; + iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, + iomgr_platform_shutdown_background_closure}; void grpc_set_default_iomgr_platform() { char* enable_cfstream = getenv(grpc_cfstream_env_var); diff --git a/src/core/lib/iomgr/iomgr_windows.cc b/src/core/lib/iomgr/iomgr_windows.cc index cdef89cbf0..24ef0dba7b 100644 --- a/src/core/lib/iomgr/iomgr_windows.cc +++ b/src/core/lib/iomgr/iomgr_windows.cc @@ -71,8 +71,11 @@ static void iomgr_platform_shutdown(void) { winsock_shutdown(); } +static void iomgr_platform_shutdown_background_closure(void) {} + static grpc_iomgr_platform_vtable vtable = { - iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown}; + iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, + iomgr_platform_shutdown_background_closure}; void grpc_set_default_iomgr_platform() { grpc_set_tcp_client_impl(&grpc_windows_tcp_client_vtable); diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h index 6afe94a7a9..7016ffc31a 100644 --- a/src/core/lib/iomgr/resolve_address.h +++ b/src/core/lib/iomgr/resolve_address.h @@ -65,7 +65,7 @@ void grpc_set_resolver_impl(grpc_address_resolver_vtable* vtable); /* Asynchronously resolve addr. Use default_port if a port isn't designated in addr, otherwise use the port in addr. */ -/* TODO(ctiller): add a timeout here */ +/* TODO(apolcyn): add a timeout here */ void grpc_resolve_address(const char* addr, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 4a5ca615e5..cfcb190d60 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -260,10 +260,17 @@ static void notify_on_write(grpc_tcp* tcp) { if (grpc_tcp_trace.enabled()) { gpr_log(GPR_INFO, "TCP:%p notify_on_write", tcp); } - cover_self(tcp); - GRPC_CLOSURE_INIT(&tcp->write_done_closure, - tcp_drop_uncovered_then_handle_write, tcp, - grpc_schedule_on_exec_ctx); + if (grpc_event_engine_run_in_background()) { + // If there is a polling engine always running in the background, there is + // no need to run the backup poller. + GRPC_CLOSURE_INIT(&tcp->write_done_closure, tcp_handle_write, tcp, + grpc_schedule_on_exec_ctx); + } else { + cover_self(tcp); + GRPC_CLOSURE_INIT(&tcp->write_done_closure, + tcp_drop_uncovered_then_handle_write, tcp, + grpc_schedule_on_exec_ctx); + } grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_done_closure); } @@ -627,7 +634,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, if (sending_length == static_cast<size_t>(length)) { gpr_mu_lock(&tcp->tb_mu); grpc_core::TracedBuffer::AddNewEntry( - &tcp->tb_head, static_cast<int>(tcp->bytes_counter + length), + &tcp->tb_head, static_cast<uint32_t>(tcp->bytes_counter + length), tcp->outgoing_buffer_arg); gpr_mu_unlock(&tcp->tb_mu); tcp->outgoing_buffer_arg = nullptr; @@ -679,11 +686,9 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, } /** For linux platforms, reads the socket's error queue and processes error - * messages from the queue. Returns true if all the errors processed were - * timestamps. Returns false if any of the errors were not timestamps. For - * non-linux platforms, error processing is not used/enabled currently. + * messages from the queue. */ -static bool process_errors(grpc_tcp* tcp) { +static void process_errors(grpc_tcp* tcp) { while (true) { struct iovec iov; iov.iov_base = nullptr; @@ -712,10 +717,10 @@ static bool process_errors(grpc_tcp* tcp) { } while (r < 0 && saved_errno == EINTR); if (r == -1 && saved_errno == EAGAIN) { - return true; /* No more errors to process */ + return; /* No more errors to process */ } if (r == -1) { - return false; + return; } if (grpc_tcp_trace.enabled()) { if ((msg.msg_flags & MSG_CTRUNC) == 1) { @@ -725,8 +730,9 @@ static bool process_errors(grpc_tcp* tcp) { if (msg.msg_controllen == 0) { /* There was no control message found. It was probably spurious. */ - return true; + return; } + bool seen = false; for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len; cmsg = CMSG_NXTHDR(&msg, cmsg)) { if (cmsg->cmsg_level != SOL_SOCKET || @@ -738,9 +744,13 @@ static bool process_errors(grpc_tcp* tcp) { "unknown control message cmsg_level:%d cmsg_type:%d", cmsg->cmsg_level, cmsg->cmsg_type); } - return false; + return; } cmsg = process_timestamp(tcp, &msg, cmsg); + seen = true; + } + if (!seen) { + return; } } } diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index c6198b8ae7..67cf5d89bf 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -161,6 +161,7 @@ void grpc_shutdown(void) { if (--g_initializations == 0) { { grpc_core::ExecCtx exec_ctx(0); + grpc_iomgr_shutdown_background_closure(); { grpc_timer_manager_set_threading( false); // shutdown timer_manager thread diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index cbdb77c844..b32f9c6ec1 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -27,6 +27,7 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> +#include "src/core/lib/gpr/alloc.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/slice/slice_internal.h" @@ -149,7 +150,7 @@ void grpc_transport_move_stats(grpc_transport_stream_stats* from, } size_t grpc_transport_stream_size(grpc_transport* transport) { - return transport->vtable->sizeof_stream; + return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(transport->vtable->sizeof_stream); } void grpc_transport_destroy(grpc_transport* transport) { diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index edfa7030d1..5ce568834e 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -129,7 +129,8 @@ struct grpc_transport_stream_op_batch { recv_initial_metadata(false), recv_message(false), recv_trailing_metadata(false), - cancel_stream(false) {} + cancel_stream(false), + is_traced(false) {} /** Should be scheduled when all of the non-recv operations in the batch are complete. @@ -167,6 +168,9 @@ struct grpc_transport_stream_op_batch { /** Cancel this stream with the provided error */ bool cancel_stream : 1; + /** Is this stream traced */ + bool is_traced : 1; + /*************************************************************************** * remaining fields are initialized and used at the discretion of the * current handler of the op */ diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc index 1de6264183..43d0979f4b 100644 --- a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc +++ b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc @@ -116,12 +116,13 @@ void alts_handshaker_client_handle_response(alts_handshaker_client* c, "cb is nullptr in alts_tsi_handshaker_handle_response()"); return; } - if (handshaker == nullptr || recv_buffer == nullptr) { + if (handshaker == nullptr) { gpr_log(GPR_ERROR, - "Invalid arguments to alts_tsi_handshaker_handle_response()"); + "handshaker is nullptr in alts_tsi_handshaker_handle_response()"); cb(TSI_INTERNAL_ERROR, user_data, nullptr, 0, nullptr); return; } + /* TSI handshake has been shutdown. */ if (alts_tsi_handshaker_has_shutdown(handshaker)) { gpr_log(GPR_ERROR, "TSI handshake shutdown"); cb(TSI_HANDSHAKE_SHUTDOWN, user_data, nullptr, 0, nullptr); @@ -133,6 +134,12 @@ void alts_handshaker_client_handle_response(alts_handshaker_client* c, cb(TSI_INTERNAL_ERROR, user_data, nullptr, 0, nullptr); return; } + if (recv_buffer == nullptr) { + gpr_log(GPR_ERROR, + "recv_buffer is nullptr in alts_tsi_handshaker_handle_response()"); + cb(TSI_INTERNAL_ERROR, user_data, nullptr, 0, nullptr); + return; + } grpc_gcp_handshaker_resp* resp = alts_tsi_utils_deserialize_response(recv_buffer); grpc_byte_buffer_destroy(client->recv_buffer); diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index 87902b26f0..f61c1b5317 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -72,4 +72,13 @@ void GenericStub::experimental_type::UnaryCall( context, request, response, std::move(on_completion)); } +void GenericStub::experimental_type::PrepareBidiStreamingCall( + ClientContext* context, const grpc::string& method, + experimental::ClientBidiReactor<ByteBuffer, ByteBuffer>* reactor) { + internal::ClientCallbackReaderWriterFactory<ByteBuffer, ByteBuffer>::Create( + stub_->channel_.get(), + internal::RpcMethod(method.c_str(), internal::RpcMethod::BIDI_STREAMING), + context, reactor); +} + } // namespace grpc diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 0a51cf5626..69af43a656 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -291,7 +291,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { void ContinueRunAfterInterception() { { - ctx_.BeginCompletionOp(&call_, false); + ctx_.BeginCompletionOp(&call_, nullptr, nullptr); global_callbacks_->PreSynchronousRequest(&ctx_); auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); @@ -456,7 +456,6 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { } } void ContinueRunAfterInterception() { - req_->ctx_.BeginCompletionOp(call_, true); req_->method_->handler()->RunHandler( internal::MethodHandler::HandlerParameter( call_, &req_->ctx_, req_->request_, req_->request_status_, @@ -1018,7 +1017,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, } } if (*status && call_) { - context_->BeginCompletionOp(&call_wrapper_, false); + context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); } *tag = tag_; if (delete_on_finalize_) { @@ -1029,7 +1028,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, void ServerInterface::BaseAsyncRequest:: ContinueFinalizeResultAfterInterception() { - context_->BeginCompletionOp(&call_wrapper_, false); + context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); // Queue a tag which will be returned immediately grpc_core::ExecCtx exec_ctx; grpc_cq_begin_op(notification_cq_->cq(), this); diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 9c01f896e6..1b524bc3e8 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -17,6 +17,7 @@ */ #include <grpcpp/server_context.h> +#include <grpcpp/support/server_callback.h> #include <algorithm> #include <mutex> @@ -41,8 +42,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { public: // initial refs: one in the server context, one in the cq // must ref the call before calling constructor and after deleting this - CompletionOp(internal::Call* call) + CompletionOp(internal::Call* call, internal::ServerReactor* reactor) : call_(*call), + reactor_(reactor), has_tag_(false), tag_(nullptr), core_cq_tag_(this), @@ -124,9 +126,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { return; } /* Start a dummy op so that we can return the tag */ - GPR_CODEGEN_ASSERT(GRPC_CALL_OK == - g_core_codegen_interface->grpc_call_start_batch( - call_.call(), nullptr, 0, this, nullptr)); + GPR_CODEGEN_ASSERT( + GRPC_CALL_OK == + grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, nullptr)); } private: @@ -136,13 +138,14 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { } internal::Call call_; + internal::ServerReactor* reactor_; bool has_tag_; void* tag_; void* core_cq_tag_; std::mutex mu_; int refs_; bool finalized_; - int cancelled_; + int cancelled_; // This is an int (not bool) because it is passed to core bool done_intercepting_; internal::InterceptorBatchMethodsImpl interceptor_methods_; }; @@ -190,7 +193,16 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { } finalized_ = true; - if (!*status) cancelled_ = 1; + // If for some reason the incoming status is false, mark that as a + // cancellation. + // TODO(vjpai): does this ever happen? + if (!*status) { + cancelled_ = 1; + } + + if (cancelled_ && (reactor_ != nullptr)) { + reactor_->OnCancel(); + } /* Release the lock since we are going to be running through interceptors now */ lock.unlock(); @@ -251,21 +263,25 @@ void ServerContext::Clear() { initial_metadata_.clear(); trailing_metadata_.clear(); client_metadata_.Reset(); - if (call_) { - grpc_call_unref(call_); - } if (completion_op_) { completion_op_->Unref(); + completion_op_ = nullptr; completion_tag_.Clear(); } if (rpc_info_) { rpc_info_->Unref(); + rpc_info_ = nullptr; + } + if (call_) { + auto* call = call_; + call_ = nullptr; + grpc_call_unref(call); } - // Don't need to clear out call_, completion_op_, or rpc_info_ because this is - // either called from destructor or just before Setup } -void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) { +void ServerContext::BeginCompletionOp(internal::Call* call, + std::function<void(bool)> callback, + internal::ServerReactor* reactor) { GPR_ASSERT(!completion_op_); if (rpc_info_) { rpc_info_->Ref(); @@ -273,10 +289,11 @@ void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) { grpc_call_ref(call->call()); completion_op_ = new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) - CompletionOp(call); - if (callback) { - completion_tag_.Set(call->call(), nullptr, completion_op_); + CompletionOp(call, reactor); + if (callback != nullptr) { + completion_tag_.Set(call->call(), std::move(callback), completion_op_); completion_op_->set_core_cq_tag(&completion_tag_); + completion_op_->set_tag(completion_op_); } else if (has_notify_when_done_tag_) { completion_op_->set_tag(async_notify_when_done_tag_); } diff --git a/src/csharp/Grpc.Core.Tests/AppDomainUnloadTest.cs b/src/csharp/Grpc.Core.Tests/AppDomainUnloadTest.cs index 3a161763fd..b10d7a1045 100644 --- a/src/csharp/Grpc.Core.Tests/AppDomainUnloadTest.cs +++ b/src/csharp/Grpc.Core.Tests/AppDomainUnloadTest.cs @@ -25,7 +25,7 @@ namespace Grpc.Core.Tests { public class AppDomainUnloadTest { -#if NETCOREAPP1_0 +#if NETCOREAPP1_1 || NETCOREAPP2_1 [Test] [Ignore("Not supported for CoreCLR")] public void AppDomainUnloadHookCanCleanupAbandonedCall() diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index d58f046824..178931a3d7 100755 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -4,7 +4,7 @@ <Import Project="..\Grpc.Core\Common.csproj.include" /> <PropertyGroup> - <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks> + <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks> <AssemblyName>Grpc.Core.Tests</AssemblyName> <OutputType>Exe</OutputType> <PackageId>Grpc.Core.Tests</PackageId> diff --git a/src/csharp/Grpc.Core.Tests/NUnitMain.cs b/src/csharp/Grpc.Core.Tests/NUnitMain.cs index 49cb8cd3b9..221a6b823a 100644 --- a/src/csharp/Grpc.Core.Tests/NUnitMain.cs +++ b/src/csharp/Grpc.Core.Tests/NUnitMain.cs @@ -34,7 +34,7 @@ namespace Grpc.Core.Tests { // Make logger immune to NUnit capturing stdout and stderr to workaround https://github.com/nunit/nunit/issues/1406. GrpcEnvironment.SetLogger(new ConsoleLogger()); -#if NETCOREAPP1_0 +#if NETCOREAPP1_1 || NETCOREAPP2_1 return new AutoRun(typeof(NUnitMain).GetTypeInfo().Assembly).Execute(args, new ExtendedTextWrapper(Console.Out), Console.In); #else return new AutoRun().Execute(args); diff --git a/src/csharp/Grpc.Core.Tests/SanityTest.cs b/src/csharp/Grpc.Core.Tests/SanityTest.cs index 0904453b6e..f785f70f4c 100644 --- a/src/csharp/Grpc.Core.Tests/SanityTest.cs +++ b/src/csharp/Grpc.Core.Tests/SanityTest.cs @@ -31,7 +31,7 @@ namespace Grpc.Core.Tests public class SanityTest { // TODO: make sanity test work for CoreCLR as well -#if !NETCOREAPP1_0 +#if !NETCOREAPP1_1 && !NETCOREAPP2_1 /// <summary> /// Because we depend on a native library, sometimes when things go wrong, the /// entire NUnit test process crashes. To be able to track down problems better, @@ -44,7 +44,7 @@ namespace Grpc.Core.Tests public void TestsJsonUpToDate() { var discoveredTests = DiscoverAllTestClasses(); - var testsFromFile + var testsFromFile = JsonConvert.DeserializeObject<Dictionary<string, List<string>>>(ReadTestsJson()); Assert.AreEqual(discoveredTests, testsFromFile); diff --git a/src/csharp/Grpc.Core/Internal/NativeExtension.cs b/src/csharp/Grpc.Core/Internal/NativeExtension.cs index f526b913af..5177b69fd9 100644 --- a/src/csharp/Grpc.Core/Internal/NativeExtension.cs +++ b/src/csharp/Grpc.Core/Internal/NativeExtension.cs @@ -83,13 +83,13 @@ namespace Grpc.Core.Internal // See https://github.com/grpc/grpc/pull/7303 for one option. var assemblyDirectory = Path.GetDirectoryName(GetAssemblyPath()); - // With old-style VS projects, the native libraries get copied using a .targets rule to the build output folder + // With "classic" VS projects, the native libraries get copied using a .targets rule to the build output folder // alongside the compiled assembly. // With dotnet cli projects targeting net45 framework, the native libraries (just the required ones) // are similarly copied to the built output folder, through the magic of Microsoft.NETCore.Platforms. var classicPath = Path.Combine(assemblyDirectory, GetNativeLibraryFilename()); - // With dotnet cli project targeting netcoreapp1.0, projects will use Grpc.Core assembly directly in the location where it got restored + // With dotnet cli project targeting netcoreappX.Y, projects will use Grpc.Core assembly directly in the location where it got restored // by nuget. We locate the native libraries based on known structure of Grpc.Core nuget package. // When "dotnet publish" is used, the runtimes directory is copied next to the published assemblies. string runtimesDirectory = string.Format("runtimes/{0}/native", GetPlatformString()); diff --git a/src/csharp/Grpc.Examples.MathClient/Grpc.Examples.MathClient.csproj b/src/csharp/Grpc.Examples.MathClient/Grpc.Examples.MathClient.csproj index db4e3ef4e3..1afcd9fba0 100755 --- a/src/csharp/Grpc.Examples.MathClient/Grpc.Examples.MathClient.csproj +++ b/src/csharp/Grpc.Examples.MathClient/Grpc.Examples.MathClient.csproj @@ -4,7 +4,7 @@ <Import Project="..\Grpc.Core\Common.csproj.include" /> <PropertyGroup> - <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks> + <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks> <AssemblyName>Grpc.Examples.MathClient</AssemblyName> <OutputType>Exe</OutputType> <PackageId>Grpc.Examples.MathClient</PackageId> diff --git a/src/csharp/Grpc.Examples.MathServer/Grpc.Examples.MathServer.csproj b/src/csharp/Grpc.Examples.MathServer/Grpc.Examples.MathServer.csproj index b12b418d01..75ef6d1008 100755 --- a/src/csharp/Grpc.Examples.MathServer/Grpc.Examples.MathServer.csproj +++ b/src/csharp/Grpc.Examples.MathServer/Grpc.Examples.MathServer.csproj @@ -4,7 +4,7 @@ <Import Project="..\Grpc.Core\Common.csproj.include" /> <PropertyGroup> - <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks> + <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks> <AssemblyName>Grpc.Examples.MathServer</AssemblyName> <OutputType>Exe</OutputType> <PackageId>Grpc.Examples.MathServer</PackageId> diff --git a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj index 7493eb8051..93d112a0c5 100755 --- a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj +++ b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj @@ -4,7 +4,7 @@ <Import Project="..\Grpc.Core\Common.csproj.include" /> <PropertyGroup> - <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks> + <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks> <AssemblyName>Grpc.Examples.Tests</AssemblyName> <OutputType>Exe</OutputType> <PackageId>Grpc.Examples.Tests</PackageId> diff --git a/src/csharp/Grpc.Examples.Tests/NUnitMain.cs b/src/csharp/Grpc.Examples.Tests/NUnitMain.cs index bcb8b46b64..0b9a5258ef 100644 --- a/src/csharp/Grpc.Examples.Tests/NUnitMain.cs +++ b/src/csharp/Grpc.Examples.Tests/NUnitMain.cs @@ -34,7 +34,7 @@ namespace Grpc.Examples.Tests { // Make logger immune to NUnit capturing stdout and stderr to workaround https://github.com/nunit/nunit/issues/1406. GrpcEnvironment.SetLogger(new ConsoleLogger()); -#if NETCOREAPP1_0 +#if NETCOREAPP1_1 || NETCOREAPP2_1 return new AutoRun(typeof(NUnitMain).GetTypeInfo().Assembly).Execute(args, new ExtendedTextWrapper(Console.Out), Console.In); #else return new AutoRun().Execute(args); diff --git a/src/csharp/Grpc.Examples/Grpc.Examples.csproj b/src/csharp/Grpc.Examples/Grpc.Examples.csproj index baa3b4ce6c..9ce2b59d03 100755 --- a/src/csharp/Grpc.Examples/Grpc.Examples.csproj +++ b/src/csharp/Grpc.Examples/Grpc.Examples.csproj @@ -4,7 +4,7 @@ <Import Project="..\Grpc.Core\Common.csproj.include" /> <PropertyGroup> - <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks> + <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks> <AssemblyName>Grpc.Examples</AssemblyName> <PackageId>Grpc.Examples</PackageId> <TreatWarningsAsErrors>true</TreatWarningsAsErrors> diff --git a/src/csharp/Grpc.HealthCheck.Tests/Grpc.HealthCheck.Tests.csproj b/src/csharp/Grpc.HealthCheck.Tests/Grpc.HealthCheck.Tests.csproj index 616e56df10..2a037a72e5 100755 --- a/src/csharp/Grpc.HealthCheck.Tests/Grpc.HealthCheck.Tests.csproj +++ b/src/csharp/Grpc.HealthCheck.Tests/Grpc.HealthCheck.Tests.csproj @@ -4,7 +4,7 @@ <Import Project="..\Grpc.Core\Common.csproj.include" /> <PropertyGroup> - <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks> + <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks> <AssemblyName>Grpc.HealthCheck.Tests</AssemblyName> <OutputType>Exe</OutputType> <PackageId>Grpc.HealthCheck.Tests</PackageId> diff --git a/src/csharp/Grpc.HealthCheck.Tests/NUnitMain.cs b/src/csharp/Grpc.HealthCheck.Tests/NUnitMain.cs index 365551e895..f5091863b8 100644 --- a/src/csharp/Grpc.HealthCheck.Tests/NUnitMain.cs +++ b/src/csharp/Grpc.HealthCheck.Tests/NUnitMain.cs @@ -34,7 +34,7 @@ namespace Grpc.HealthCheck.Tests { // Make logger immune to NUnit capturing stdout and stderr to workaround https://github.com/nunit/nunit/issues/1406. GrpcEnvironment.SetLogger(new ConsoleLogger()); -#if NETCOREAPP1_0 +#if NETCOREAPP1_1 || NETCOREAPP2_1 return new AutoRun(typeof(NUnitMain).GetTypeInfo().Assembly).Execute(args, new ExtendedTextWrapper(Console.Out), Console.In); #else return new AutoRun().Execute(args); diff --git a/src/csharp/Grpc.IntegrationTesting.Client/Grpc.IntegrationTesting.Client.csproj b/src/csharp/Grpc.IntegrationTesting.Client/Grpc.IntegrationTesting.Client.csproj index 35713156ea..1cd4b83e1e 100755 --- a/src/csharp/Grpc.IntegrationTesting.Client/Grpc.IntegrationTesting.Client.csproj +++ b/src/csharp/Grpc.IntegrationTesting.Client/Grpc.IntegrationTesting.Client.csproj @@ -4,7 +4,7 @@ <Import Project="..\Grpc.Core\Common.csproj.include" /> <PropertyGroup> - <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks> + <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks> <AssemblyName>Grpc.IntegrationTesting.Client</AssemblyName> <OutputType>Exe</OutputType> <PackageId>Grpc.IntegrationTesting.Client</PackageId> diff --git a/src/csharp/Grpc.IntegrationTesting.QpsWorker/Grpc.IntegrationTesting.QpsWorker.csproj b/src/csharp/Grpc.IntegrationTesting.QpsWorker/Grpc.IntegrationTesting.QpsWorker.csproj index 3ecefe3bc4..2890a7df58 100755 --- a/src/csharp/Grpc.IntegrationTesting.QpsWorker/Grpc.IntegrationTesting.QpsWorker.csproj +++ b/src/csharp/Grpc.IntegrationTesting.QpsWorker/Grpc.IntegrationTesting.QpsWorker.csproj @@ -4,7 +4,7 @@ <Import Project="..\Grpc.Core\Common.csproj.include" /> <PropertyGroup> - <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks> + <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks> <AssemblyName>Grpc.IntegrationTesting.QpsWorker</AssemblyName> <OutputType>Exe</OutputType> <PackageId>Grpc.IntegrationTesting.QpsWorker</PackageId> diff --git a/src/csharp/Grpc.IntegrationTesting.Server/Grpc.IntegrationTesting.Server.csproj b/src/csharp/Grpc.IntegrationTesting.Server/Grpc.IntegrationTesting.Server.csproj index 1092b2c21e..ee718958bc 100755 --- a/src/csharp/Grpc.IntegrationTesting.Server/Grpc.IntegrationTesting.Server.csproj +++ b/src/csharp/Grpc.IntegrationTesting.Server/Grpc.IntegrationTesting.Server.csproj @@ -4,7 +4,7 @@ <Import Project="..\Grpc.Core\Common.csproj.include" /> <PropertyGroup> - <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks> + <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks> <AssemblyName>Grpc.IntegrationTesting.Server</AssemblyName> <OutputType>Exe</OutputType> <PackageId>Grpc.IntegrationTesting.Server</PackageId> @@ -19,7 +19,7 @@ <Reference Include="System" /> <Reference Include="Microsoft.CSharp" /> </ItemGroup> - + <ItemGroup> <Compile Include="..\Grpc.Core\Version.cs" /> </ItemGroup> diff --git a/src/csharp/Grpc.IntegrationTesting.StressClient/Grpc.IntegrationTesting.StressClient.csproj b/src/csharp/Grpc.IntegrationTesting.StressClient/Grpc.IntegrationTesting.StressClient.csproj index 22272547f6..99926497e4 100755 --- a/src/csharp/Grpc.IntegrationTesting.StressClient/Grpc.IntegrationTesting.StressClient.csproj +++ b/src/csharp/Grpc.IntegrationTesting.StressClient/Grpc.IntegrationTesting.StressClient.csproj @@ -4,7 +4,7 @@ <Import Project="..\Grpc.Core\Common.csproj.include" /> <PropertyGroup> - <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks> + <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks> <AssemblyName>Grpc.IntegrationTesting.StressClient</AssemblyName> <OutputType>Exe</OutputType> <PackageId>Grpc.IntegrationTesting.StressClient</PackageId> diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index 8daf3fa98b..c342f8a107 100755 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -4,7 +4,7 @@ <Import Project="..\Grpc.Core\Common.csproj.include" /> <PropertyGroup> - <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks> + <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks> <AssemblyName>Grpc.IntegrationTesting</AssemblyName> <OutputType>Exe</OutputType> <PackageId>Grpc.IntegrationTesting</PackageId> diff --git a/src/csharp/Grpc.IntegrationTesting/NUnitMain.cs b/src/csharp/Grpc.IntegrationTesting/NUnitMain.cs index 9d24762e0a..6265953622 100644 --- a/src/csharp/Grpc.IntegrationTesting/NUnitMain.cs +++ b/src/csharp/Grpc.IntegrationTesting/NUnitMain.cs @@ -34,7 +34,7 @@ namespace Grpc.IntegrationTesting { // Make logger immune to NUnit capturing stdout and stderr to workaround https://github.com/nunit/nunit/issues/1406. GrpcEnvironment.SetLogger(new ConsoleLogger()); -#if NETCOREAPP1_0 +#if NETCOREAPP1_1 || NETCOREAPP2_1 return new AutoRun(typeof(NUnitMain).GetTypeInfo().Assembly).Execute(args, new ExtendedTextWrapper(Console.Out), Console.In); #else return new AutoRun().Execute(args); diff --git a/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj b/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj index d39d46cf1b..5b1656080a 100644 --- a/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj +++ b/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj @@ -4,7 +4,7 @@ <Import Project="..\Grpc.Core\Common.csproj.include" /> <PropertyGroup> - <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks> + <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks> <AssemblyName>Grpc.Microbenchmarks</AssemblyName> <OutputType>Exe</OutputType> <PackageId>Grpc.Microbenchmarks</PackageId> diff --git a/src/csharp/Grpc.Reflection.Tests/Grpc.Reflection.Tests.csproj b/src/csharp/Grpc.Reflection.Tests/Grpc.Reflection.Tests.csproj index 0c12f38f25..8b586c6ecb 100755 --- a/src/csharp/Grpc.Reflection.Tests/Grpc.Reflection.Tests.csproj +++ b/src/csharp/Grpc.Reflection.Tests/Grpc.Reflection.Tests.csproj @@ -4,7 +4,7 @@ <Import Project="..\Grpc.Core\Common.csproj.include" /> <PropertyGroup> - <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks> + <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks> <AssemblyName>Grpc.Reflection.Tests</AssemblyName> <OutputType>Exe</OutputType> <PackageId>Grpc.Reflection.Tests</PackageId> diff --git a/src/csharp/Grpc.Reflection.Tests/NUnitMain.cs b/src/csharp/Grpc.Reflection.Tests/NUnitMain.cs index 49ed1cc8d4..1be7f6542b 100644 --- a/src/csharp/Grpc.Reflection.Tests/NUnitMain.cs +++ b/src/csharp/Grpc.Reflection.Tests/NUnitMain.cs @@ -34,7 +34,7 @@ namespace Grpc.Reflection.Tests { // Make logger immune to NUnit capturing stdout and stderr to workaround https://github.com/nunit/nunit/issues/1406. GrpcEnvironment.SetLogger(new ConsoleLogger()); -#if NETCOREAPP1_0 +#if NETCOREAPP1_1 || NETCOREAPP2_1 return new AutoRun(typeof(NUnitMain).GetTypeInfo().Assembly).Execute(args, new ExtendedTextWrapper(Console.Out), Console.In); #else return new AutoRun().Execute(args); diff --git a/src/csharp/Grpc.Tools.Tests/Grpc.Tools.Tests.csproj b/src/csharp/Grpc.Tools.Tests/Grpc.Tools.Tests.csproj index a2d4874eec..cfb40f44ae 100644 --- a/src/csharp/Grpc.Tools.Tests/Grpc.Tools.Tests.csproj +++ b/src/csharp/Grpc.Tools.Tests/Grpc.Tools.Tests.csproj @@ -3,7 +3,7 @@ <Import Project="..\Grpc.Core\Version.csproj.include" /> <PropertyGroup> - <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks> + <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks> <OutputType>Exe</OutputType> </PropertyGroup> diff --git a/src/csharp/Grpc.Tools.Tests/NUnitMain.cs b/src/csharp/Grpc.Tools.Tests/NUnitMain.cs index 418c33820e..13ba8b51d0 100644 --- a/src/csharp/Grpc.Tools.Tests/NUnitMain.cs +++ b/src/csharp/Grpc.Tools.Tests/NUnitMain.cs @@ -24,7 +24,7 @@ namespace Grpc.Tools.Tests static class NUnitMain { public static int Main(string[] args) => -#if NETCOREAPP1_0 || NETCOREAPP1_1 +#if NETCOREAPP1_1 || NETCOREAPP2_1 new AutoRun(typeof(NUnitMain).GetTypeInfo().Assembly).Execute(args); #else new AutoRun().Execute(args); diff --git a/src/php/ext/grpc/php_grpc.c b/src/php/ext/grpc/php_grpc.c index f7d5a85876..111c6f4867 100644 --- a/src/php/ext/grpc/php_grpc.c +++ b/src/php/ext/grpc/php_grpc.c @@ -170,7 +170,9 @@ void prefork() { acquire_persistent_locks(); } -void postfork_child(TSRMLS_D) { +void postfork_child() { + TSRMLS_FETCH(); + // loop through persistant list and destroy all underlying grpc_channel objs destroy_grpc_channels(); diff --git a/src/proto/grpc/channelz/BUILD b/src/proto/grpc/channelz/BUILD index bdb03d5e2d..b6b485e3e8 100644 --- a/src/proto/grpc/channelz/BUILD +++ b/src/proto/grpc/channelz/BUILD @@ -24,3 +24,10 @@ grpc_proto_library( has_services = True, well_known_protos = True, ) + +filegroup( + name = "channelz_proto_file", + srcs = [ + "channelz.proto", + ], +) diff --git a/src/proto/grpc/health/v1/BUILD b/src/proto/grpc/health/v1/BUILD index 97642985c9..38a7d992e0 100644 --- a/src/proto/grpc/health/v1/BUILD +++ b/src/proto/grpc/health/v1/BUILD @@ -29,4 +29,3 @@ filegroup( "health.proto", ], ) - diff --git a/src/proto/grpc/testing/compiler_test.proto b/src/proto/grpc/testing/compiler_test.proto index db5ca48331..9fa5590a59 100644 --- a/src/proto/grpc/testing/compiler_test.proto +++ b/src/proto/grpc/testing/compiler_test.proto @@ -20,6 +20,9 @@ syntax = "proto3"; // Ignored detached comment +// The comments in this file are not meant for readability +// but rather to test to make sure that the code generator +// properly preserves comments on files, services, and RPCs // Ignored package leading comment package grpc.testing; diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 88c5b6d5be..6022fc3ef2 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -1723,7 +1723,7 @@ def server(thread_pool, handlers. The interceptors are given control in the order they are specified. This is an EXPERIMENTAL API. options: An optional list of key-value pairs (channel args in gRPC runtime) - to configure the channel. + to configure the channel. maximum_concurrent_rpcs: The maximum number of concurrent RPCs this server will service before returning RESOURCE_EXHAUSTED status, or None to indicate no limit. diff --git a/src/python/grpcio/grpc/_cython/BUILD.bazel b/src/python/grpcio/grpc/_cython/BUILD.bazel index cfd3a51d9b..e318298d0a 100644 --- a/src/python/grpcio/grpc/_cython/BUILD.bazel +++ b/src/python/grpcio/grpc/_cython/BUILD.bazel @@ -12,6 +12,7 @@ pyx_library( "_cygrpc/grpc_string.pyx.pxi", "_cygrpc/arguments.pyx.pxi", "_cygrpc/call.pyx.pxi", + "_cygrpc/channelz.pyx.pxi", "_cygrpc/channel.pyx.pxi", "_cygrpc/credentials.pyx.pxi", "_cygrpc/completion_queue.pyx.pxi", diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channelz.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channelz.pyx.pxi new file mode 100644 index 0000000000..113f7976dd --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channelz.pyx.pxi @@ -0,0 +1,69 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def channelz_get_top_channels(start_channel_id): + cdef char *c_returned_str = grpc_channelz_get_top_channels( + start_channel_id, + ) + if c_returned_str == NULL: + raise ValueError('Failed to get top channels, please ensure your' \ + ' start_channel_id==%s is valid' % start_channel_id) + return c_returned_str + +def channelz_get_servers(start_server_id): + cdef char *c_returned_str = grpc_channelz_get_servers(start_server_id) + if c_returned_str == NULL: + raise ValueError('Failed to get servers, please ensure your' \ + ' start_server_id==%s is valid' % start_server_id) + return c_returned_str + +def channelz_get_server(server_id): + cdef char *c_returned_str = grpc_channelz_get_server(server_id) + if c_returned_str == NULL: + raise ValueError('Failed to get the server, please ensure your' \ + ' server_id==%s is valid' % server_id) + return c_returned_str + +def channelz_get_server_sockets(server_id, start_socket_id): + cdef char *c_returned_str = grpc_channelz_get_server_sockets( + server_id, + start_socket_id, + ) + if c_returned_str == NULL: + raise ValueError('Failed to get server sockets, please ensure your' \ + ' server_id==%s and start_socket_id==%s is valid' % + (server_id, start_socket_id)) + return c_returned_str + +def channelz_get_channel(channel_id): + cdef char *c_returned_str = grpc_channelz_get_channel(channel_id) + if c_returned_str == NULL: + raise ValueError('Failed to get the channel, please ensure your' \ + ' channel_id==%s is valid' % (channel_id)) + return c_returned_str + +def channelz_get_subchannel(subchannel_id): + cdef char *c_returned_str = grpc_channelz_get_subchannel(subchannel_id) + if c_returned_str == NULL: + raise ValueError('Failed to get the subchannel, please ensure your' \ + ' subchannel_id==%s is valid' % (subchannel_id)) + return c_returned_str + +def channelz_get_socket(socket_id): + cdef char *c_returned_str = grpc_channelz_get_socket(socket_id) + if c_returned_str == NULL: + raise ValueError('Failed to get the socket, please ensure your' \ + ' socket_id==%s is valid' % (socket_id)) + return c_returned_str diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 5aaf31e36c..5bbc10af25 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -13,6 +13,7 @@ # limitations under the License. cimport libc.time +from libc.stdint cimport intptr_t # Typedef types with approximately the same semantics to provide their names to @@ -384,6 +385,15 @@ cdef extern from "grpc/grpc.h": void grpc_server_cancel_all_calls(grpc_server *server) nogil void grpc_server_destroy(grpc_server *server) nogil + char* grpc_channelz_get_top_channels(intptr_t start_channel_id) + char* grpc_channelz_get_servers(intptr_t start_server_id) + char* grpc_channelz_get_server(intptr_t server_id) + char* grpc_channelz_get_server_sockets(intptr_t server_id, + intptr_t start_socket_id) + char* grpc_channelz_get_channel(intptr_t channel_id) + char* grpc_channelz_get_subchannel(intptr_t subchannel_id) + char* grpc_channelz_get_socket(intptr_t socket_id) + cdef extern from "grpc/grpc_security.h": diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index ae5c07bfc8..9ab919375c 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -35,6 +35,7 @@ include "_cygrpc/server.pyx.pxi" include "_cygrpc/tag.pyx.pxi" include "_cygrpc/time.pyx.pxi" include "_cygrpc/_hooks.pyx.pxi" +include "_cygrpc/channelz.pyx.pxi" include "_cygrpc/grpc_gevent.pyx.pxi" diff --git a/src/python/grpcio_channelz/.gitignore b/src/python/grpcio_channelz/.gitignore new file mode 100644 index 0000000000..0c5da6b5af --- /dev/null +++ b/src/python/grpcio_channelz/.gitignore @@ -0,0 +1,6 @@ +*.proto +*_pb2.py +*_pb2_grpc.py +build/ +grpcio_channelz.egg-info/ +dist/ diff --git a/src/python/grpcio_channelz/MANIFEST.in b/src/python/grpcio_channelz/MANIFEST.in new file mode 100644 index 0000000000..5597f375ba --- /dev/null +++ b/src/python/grpcio_channelz/MANIFEST.in @@ -0,0 +1,3 @@ +include grpc_version.py +recursive-include grpc_channelz *.py +global-exclude *.pyc diff --git a/src/python/grpcio_channelz/README.rst b/src/python/grpcio_channelz/README.rst new file mode 100644 index 0000000000..efeaa56064 --- /dev/null +++ b/src/python/grpcio_channelz/README.rst @@ -0,0 +1,9 @@ +gRPC Python Channelz package +============================== + +Channelz is a live debug tool in gRPC Python. + +Dependencies +------------ + +Depends on the `grpcio` package, available from PyPI via `pip install grpcio`. diff --git a/src/python/grpcio_channelz/channelz_commands.py b/src/python/grpcio_channelz/channelz_commands.py new file mode 100644 index 0000000000..e9ad355034 --- /dev/null +++ b/src/python/grpcio_channelz/channelz_commands.py @@ -0,0 +1,63 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Provides distutils command classes for the GRPC Python setup process.""" + +import os +import shutil + +import setuptools + +ROOT_DIR = os.path.abspath(os.path.dirname(os.path.abspath(__file__))) +CHANNELZ_PROTO = os.path.join(ROOT_DIR, + '../../proto/grpc/channelz/channelz.proto') + + +class CopyProtoModules(setuptools.Command): + """Command to copy proto modules from grpc/src/proto.""" + + description = '' + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def run(self): + if os.path.isfile(CHANNELZ_PROTO): + shutil.copyfile(CHANNELZ_PROTO, + os.path.join(ROOT_DIR, + 'grpc_channelz/v1/channelz.proto')) + + +class BuildPackageProtos(setuptools.Command): + """Command to generate project *_pb2.py modules from proto files.""" + + description = 'build grpc protobuf modules' + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def run(self): + # due to limitations of the proto generator, we require that only *one* + # directory is provided as an 'include' directory. We assume it's the '' key + # to `self.distribution.package_dir` (and get a key error if it's not + # there). + from grpc_tools import command + command.build_package_protos(self.distribution.package_dir['']) diff --git a/src/python/grpcio_channelz/grpc_channelz/__init__.py b/src/python/grpcio_channelz/grpc_channelz/__init__.py new file mode 100644 index 0000000000..38fdfc9c5c --- /dev/null +++ b/src/python/grpcio_channelz/grpc_channelz/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_channelz/grpc_channelz/v1/BUILD.bazel b/src/python/grpcio_channelz/grpc_channelz/v1/BUILD.bazel new file mode 100644 index 0000000000..aae8cedb76 --- /dev/null +++ b/src/python/grpcio_channelz/grpc_channelz/v1/BUILD.bazel @@ -0,0 +1,38 @@ +load("@grpc_python_dependencies//:requirements.bzl", "requirement") +load("@org_pubref_rules_protobuf//python:rules.bzl", "py_proto_library") + +package(default_visibility = ["//visibility:public"]) + +genrule( + name = "mv_channelz_proto", + srcs = [ + "//src/proto/grpc/channelz:channelz_proto_file", + ], + outs = ["channelz.proto",], + cmd = "cp $< $@", +) + +py_proto_library( + name = "py_channelz_proto", + protos = ["mv_channelz_proto",], + imports = [ + "external/com_google_protobuf/src/", + ], + inputs = [ + "@com_google_protobuf//:well_known_protos", + ], + with_grpc = True, + deps = [ + requirement('protobuf'), + ], +) + +py_library( + name = "grpc_channelz", + srcs = ["channelz.py",], + deps = [ + ":py_channelz_proto", + "//src/python/grpcio/grpc:grpcio", + ], + imports=["../../",], +) diff --git a/src/python/grpcio_channelz/grpc_channelz/v1/__init__.py b/src/python/grpcio_channelz/grpc_channelz/v1/__init__.py new file mode 100644 index 0000000000..38fdfc9c5c --- /dev/null +++ b/src/python/grpcio_channelz/grpc_channelz/v1/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_channelz/grpc_channelz/v1/channelz.py b/src/python/grpcio_channelz/grpc_channelz/v1/channelz.py new file mode 100644 index 0000000000..573b9d0d5a --- /dev/null +++ b/src/python/grpcio_channelz/grpc_channelz/v1/channelz.py @@ -0,0 +1,141 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Channelz debug service implementation in gRPC Python.""" + +import grpc +from grpc._cython import cygrpc + +import grpc_channelz.v1.channelz_pb2 as _channelz_pb2 +import grpc_channelz.v1.channelz_pb2_grpc as _channelz_pb2_grpc + +from google.protobuf import json_format + + +class ChannelzServicer(_channelz_pb2_grpc.ChannelzServicer): + """Servicer handling RPCs for service statuses.""" + + @staticmethod + def GetTopChannels(request, context): + try: + return json_format.Parse( + cygrpc.channelz_get_top_channels(request.start_channel_id), + _channelz_pb2.GetTopChannelsResponse(), + ) + except (ValueError, json_format.ParseError) as e: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(str(e)) + + @staticmethod + def GetServers(request, context): + try: + return json_format.Parse( + cygrpc.channelz_get_servers(request.start_server_id), + _channelz_pb2.GetServersResponse(), + ) + except (ValueError, json_format.ParseError) as e: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(str(e)) + + @staticmethod + def GetServer(request, context): + try: + return json_format.Parse( + cygrpc.channelz_get_server(request.server_id), + _channelz_pb2.GetServerResponse(), + ) + except ValueError as e: + context.set_code(grpc.StatusCode.NOT_FOUND) + context.set_details(str(e)) + except json_format.ParseError as e: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(str(e)) + + @staticmethod + def GetServerSockets(request, context): + try: + return json_format.Parse( + cygrpc.channelz_get_server_sockets(request.server_id, + request.start_socket_id), + _channelz_pb2.GetServerSocketsResponse(), + ) + except ValueError as e: + context.set_code(grpc.StatusCode.NOT_FOUND) + context.set_details(str(e)) + except json_format.ParseError as e: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(str(e)) + + @staticmethod + def GetChannel(request, context): + try: + return json_format.Parse( + cygrpc.channelz_get_channel(request.channel_id), + _channelz_pb2.GetChannelResponse(), + ) + except ValueError as e: + context.set_code(grpc.StatusCode.NOT_FOUND) + context.set_details(str(e)) + except json_format.ParseError as e: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(str(e)) + + @staticmethod + def GetSubchannel(request, context): + try: + return json_format.Parse( + cygrpc.channelz_get_subchannel(request.subchannel_id), + _channelz_pb2.GetSubchannelResponse(), + ) + except ValueError as e: + context.set_code(grpc.StatusCode.NOT_FOUND) + context.set_details(str(e)) + except json_format.ParseError as e: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(str(e)) + + @staticmethod + def GetSocket(request, context): + try: + return json_format.Parse( + cygrpc.channelz_get_socket(request.socket_id), + _channelz_pb2.GetSocketResponse(), + ) + except ValueError as e: + context.set_code(grpc.StatusCode.NOT_FOUND) + context.set_details(str(e)) + except json_format.ParseError as e: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(str(e)) + + +def add_channelz_servicer(server): + """Add Channelz servicer to a server. Channelz servicer is in charge of + pulling information from C-Core for entire process. It will allow the + server to response to Channelz queries. + + The Channelz statistic is enabled by default inside C-Core. Whether the + statistic is enabled or not is isolated from adding Channelz servicer. + That means you can query Channelz info with a Channelz-disabled channel, + and you can add Channelz servicer to a Channelz-disabled server. + + The Channelz statistic can be enabled or disabled by channel option + 'grpc.enable_channelz'. Set to 1 to enable, set to 0 to disable. + + This is an EXPERIMENTAL API. + + Args: + server: grpc.Server to which Channelz service will be added. + """ + _channelz_pb2_grpc.add_ChannelzServicer_to_server(ChannelzServicer(), + server) diff --git a/src/python/grpcio_channelz/grpc_version.py b/src/python/grpcio_channelz/grpc_version.py new file mode 100644 index 0000000000..16356ea402 --- /dev/null +++ b/src/python/grpcio_channelz/grpc_version.py @@ -0,0 +1,17 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_channelz/grpc_version.py.template`!!! + +VERSION = '1.18.0.dev0' diff --git a/src/python/grpcio_channelz/setup.py b/src/python/grpcio_channelz/setup.py new file mode 100644 index 0000000000..a495052376 --- /dev/null +++ b/src/python/grpcio_channelz/setup.py @@ -0,0 +1,96 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Setup module for the GRPC Python package's Channelz.""" + +import os +import sys + +import setuptools + +# Ensure we're in the proper directory whether or not we're being used by pip. +os.chdir(os.path.dirname(os.path.abspath(__file__))) + +# Break import-style to ensure we can actually find our local modules. +import grpc_version + + +class _NoOpCommand(setuptools.Command): + """No-op command.""" + + description = '' + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def run(self): + pass + + +CLASSIFIERS = [ + 'Development Status :: 5 - Production/Stable', + 'Programming Language :: Python', + 'Programming Language :: Python :: 2', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + 'License :: OSI Approved :: Apache Software License', +] + +PACKAGE_DIRECTORIES = { + '': '.', +} + +INSTALL_REQUIRES = ( + 'protobuf>=3.6.0', + 'grpcio>={version}'.format(version=grpc_version.VERSION), +) + +try: + import channelz_commands as _channelz_commands + # we are in the build environment, otherwise the above import fails + SETUP_REQUIRES = ( + 'grpcio-tools=={version}'.format(version=grpc_version.VERSION),) + COMMAND_CLASS = { + # Run preprocess from the repository *before* doing any packaging! + 'preprocess': _channelz_commands.CopyProtoModules, + 'build_package_protos': _channelz_commands.BuildPackageProtos, + } +except ImportError: + SETUP_REQUIRES = () + COMMAND_CLASS = { + # wire up commands to no-op not to break the external dependencies + 'preprocess': _NoOpCommand, + 'build_package_protos': _NoOpCommand, + } + +setuptools.setup( + name='grpcio-channelz', + version=grpc_version.VERSION, + license='Apache License 2.0', + description='Channel Level Live Debug Information Service for gRPC', + author='The gRPC Authors', + author_email='grpc-io@googlegroups.com', + classifiers=CLASSIFIERS, + url='https://grpc.io', + package_dir=PACKAGE_DIRECTORIES, + packages=setuptools.find_packages('.'), + install_requires=INSTALL_REQUIRES, + setup_requires=SETUP_REQUIRES, + cmdclass=COMMAND_CLASS) diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index d163f6fb68..65e9a99950 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -132,7 +132,12 @@ class TestGevent(setuptools.Command): 'unit.beta._beta_features_test', # TODO(https://github.com/grpc/grpc/issues/15411) unpin gevent version # This test will stuck while running higher version of gevent - 'unit._auth_context_test.AuthContextTest.testSessionResumption') + 'unit._auth_context_test.AuthContextTest.testSessionResumption', + # TODO(https://github.com/grpc/grpc/issues/17330) enable these three tests + 'channelz._channelz_servicer_test.ChannelzServicerTest.test_many_subchannels', + 'channelz._channelz_servicer_test.ChannelzServicerTest.test_many_subchannels_and_sockets', + 'channelz._channelz_servicer_test.ChannelzServicerTest.test_streaming_rpc' + ) description = 'run tests with gevent. Assumes grpc/gevent are installed' user_options = [] diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py index 61c98fa038..f56425ac6d 100644 --- a/src/python/grpcio_tests/setup.py +++ b/src/python/grpcio_tests/setup.py @@ -39,6 +39,7 @@ PACKAGE_DIRECTORIES = { INSTALL_REQUIRES = ( 'coverage>=4.0', 'enum34>=1.0.4', 'grpcio>={version}'.format(version=grpc_version.VERSION), + 'grpcio-channelz>={version}'.format(version=grpc_version.VERSION), 'grpcio-tools>={version}'.format(version=grpc_version.VERSION), 'grpcio-health-checking>={version}'.format(version=grpc_version.VERSION), 'oauth2client>=1.4.7', 'protobuf>=3.6.0', 'six>=1.10', 'google-auth>=1.0.0', diff --git a/src/python/grpcio_tests/tests/channelz/BUILD.bazel b/src/python/grpcio_tests/tests/channelz/BUILD.bazel new file mode 100644 index 0000000000..63513616e7 --- /dev/null +++ b/src/python/grpcio_tests/tests/channelz/BUILD.bazel @@ -0,0 +1,15 @@ +package(default_visibility = ["//visibility:public"]) + +py_test( + name = "channelz_servicer_test", + srcs = ["_channelz_servicer_test.py"], + main = "_channelz_servicer_test.py", + size = "small", + deps = [ + "//src/python/grpcio/grpc:grpcio", + "//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz", + "//src/python/grpcio_tests/tests/unit:test_common", + "//src/python/grpcio_tests/tests/unit/framework/common:common", + ], + imports = ["../../",], +) diff --git a/src/python/grpcio_tests/tests/channelz/__init__.py b/src/python/grpcio_tests/tests/channelz/__init__.py new file mode 100644 index 0000000000..38fdfc9c5c --- /dev/null +++ b/src/python/grpcio_tests/tests/channelz/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py new file mode 100644 index 0000000000..84f8594689 --- /dev/null +++ b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py @@ -0,0 +1,494 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests of grpc_channelz.v1.channelz.""" + +import unittest + +from concurrent import futures + +import grpc +from grpc_channelz.v1 import channelz +from grpc_channelz.v1 import channelz_pb2 +from grpc_channelz.v1 import channelz_pb2_grpc + +from tests.unit import test_common +from tests.unit.framework.common import test_constants + +_SUCCESSFUL_UNARY_UNARY = '/test/SuccessfulUnaryUnary' +_FAILED_UNARY_UNARY = '/test/FailedUnaryUnary' +_SUCCESSFUL_STREAM_STREAM = '/test/SuccessfulStreamStream' + +_REQUEST = b'\x00\x00\x00' +_RESPONSE = b'\x01\x01\x01' + +_DISABLE_REUSE_PORT = (('grpc.so_reuseport', 0),) +_ENABLE_CHANNELZ = (('grpc.enable_channelz', 1),) +_DISABLE_CHANNELZ = (('grpc.enable_channelz', 0),) + + +def _successful_unary_unary(request, servicer_context): + return _RESPONSE + + +def _failed_unary_unary(request, servicer_context): + servicer_context.set_code(grpc.StatusCode.INTERNAL) + servicer_context.set_details("Channelz Test Intended Failure") + + +def _successful_stream_stream(request_iterator, servicer_context): + for _ in request_iterator: + yield _RESPONSE + + +class _GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + if handler_call_details.method == _SUCCESSFUL_UNARY_UNARY: + return grpc.unary_unary_rpc_method_handler(_successful_unary_unary) + elif handler_call_details.method == _FAILED_UNARY_UNARY: + return grpc.unary_unary_rpc_method_handler(_failed_unary_unary) + elif handler_call_details.method == _SUCCESSFUL_STREAM_STREAM: + return grpc.stream_stream_rpc_method_handler( + _successful_stream_stream) + else: + return None + + +class _ChannelServerPair(object): + + def __init__(self): + # Server will enable channelz service + # Bind as attribute, so its `del` can be called explicitly, during + # the destruction process. Otherwise, if the removal of server + # rely on gc cycle, the test will become non-deterministic. + self._server = grpc.server( + futures.ThreadPoolExecutor(max_workers=3), + options=_DISABLE_REUSE_PORT + _ENABLE_CHANNELZ) + port = self._server.add_insecure_port('[::]:0') + self._server.add_generic_rpc_handlers((_GenericHandler(),)) + self._server.start() + + # Channel will enable channelz service... + self.channel = grpc.insecure_channel('localhost:%d' % port, + _ENABLE_CHANNELZ) + + def __del__(self): + self._server.__del__() + self.channel.close() + + +def _generate_channel_server_pairs(n): + return [_ChannelServerPair() for i in range(n)] + + +def _clean_channel_server_pairs(pairs): + for pair in pairs: + pair.__del__() + + +class ChannelzServicerTest(unittest.TestCase): + + def _send_successful_unary_unary(self, idx): + _, r = self._pairs[idx].channel.unary_unary( + _SUCCESSFUL_UNARY_UNARY).with_call(_REQUEST) + self.assertEqual(r.code(), grpc.StatusCode.OK) + + def _send_failed_unary_unary(self, idx): + try: + self._pairs[idx].channel.unary_unary(_FAILED_UNARY_UNARY).with_call( + _REQUEST) + except grpc.RpcError: + return + else: + self.fail("This call supposed to fail") + + def _send_successful_stream_stream(self, idx): + response_iterator = self._pairs[idx].channel.stream_stream( + _SUCCESSFUL_STREAM_STREAM).__call__( + iter([_REQUEST] * test_constants.STREAM_LENGTH)) + cnt = 0 + for _ in response_iterator: + cnt += 1 + self.assertEqual(cnt, test_constants.STREAM_LENGTH) + + def _get_channel_id(self, idx): + """Channel id may not be consecutive""" + resp = self._channelz_stub.GetTopChannels( + channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) + self.assertGreater(len(resp.channel), idx) + return resp.channel[idx].ref.channel_id + + def setUp(self): + self._pairs = [] + # This server is for Channelz info fetching only + # It self should not enable Channelz + self._server = grpc.server( + futures.ThreadPoolExecutor(max_workers=3), + options=_DISABLE_REUSE_PORT + _DISABLE_CHANNELZ) + port = self._server.add_insecure_port('[::]:0') + channelz.add_channelz_servicer(self._server) + self._server.start() + + # This channel is used to fetch Channelz info only + # Channelz should not be enabled + self._channel = grpc.insecure_channel('localhost:%d' % port, + _DISABLE_CHANNELZ) + self._channelz_stub = channelz_pb2_grpc.ChannelzStub(self._channel) + + def tearDown(self): + self._server.__del__() + self._channel.close() + _clean_channel_server_pairs(self._pairs) + + def test_get_top_channels_basic(self): + self._pairs = _generate_channel_server_pairs(1) + resp = self._channelz_stub.GetTopChannels( + channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) + self.assertEqual(len(resp.channel), 1) + self.assertEqual(resp.end, True) + + def test_get_top_channels_high_start_id(self): + self._pairs = _generate_channel_server_pairs(1) + resp = self._channelz_stub.GetTopChannels( + channelz_pb2.GetTopChannelsRequest(start_channel_id=10000)) + self.assertEqual(len(resp.channel), 0) + self.assertEqual(resp.end, True) + + def test_successful_request(self): + self._pairs = _generate_channel_server_pairs(1) + self._send_successful_unary_unary(0) + resp = self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))) + self.assertEqual(resp.channel.data.calls_started, 1) + self.assertEqual(resp.channel.data.calls_succeeded, 1) + self.assertEqual(resp.channel.data.calls_failed, 0) + + def test_failed_request(self): + self._pairs = _generate_channel_server_pairs(1) + self._send_failed_unary_unary(0) + resp = self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))) + self.assertEqual(resp.channel.data.calls_started, 1) + self.assertEqual(resp.channel.data.calls_succeeded, 0) + self.assertEqual(resp.channel.data.calls_failed, 1) + + def test_many_requests(self): + self._pairs = _generate_channel_server_pairs(1) + k_success = 7 + k_failed = 9 + for i in range(k_success): + self._send_successful_unary_unary(0) + for i in range(k_failed): + self._send_failed_unary_unary(0) + resp = self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))) + self.assertEqual(resp.channel.data.calls_started, k_success + k_failed) + self.assertEqual(resp.channel.data.calls_succeeded, k_success) + self.assertEqual(resp.channel.data.calls_failed, k_failed) + + def test_many_channel(self): + k_channels = 4 + self._pairs = _generate_channel_server_pairs(k_channels) + resp = self._channelz_stub.GetTopChannels( + channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) + self.assertEqual(len(resp.channel), k_channels) + + def test_many_requests_many_channel(self): + k_channels = 4 + self._pairs = _generate_channel_server_pairs(k_channels) + k_success = 11 + k_failed = 13 + for i in range(k_success): + self._send_successful_unary_unary(0) + self._send_successful_unary_unary(2) + for i in range(k_failed): + self._send_failed_unary_unary(1) + self._send_failed_unary_unary(2) + + # The first channel saw only successes + resp = self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))) + self.assertEqual(resp.channel.data.calls_started, k_success) + self.assertEqual(resp.channel.data.calls_succeeded, k_success) + self.assertEqual(resp.channel.data.calls_failed, 0) + + # The second channel saw only failures + resp = self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(1))) + self.assertEqual(resp.channel.data.calls_started, k_failed) + self.assertEqual(resp.channel.data.calls_succeeded, 0) + self.assertEqual(resp.channel.data.calls_failed, k_failed) + + # The third channel saw both successes and failures + resp = self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(2))) + self.assertEqual(resp.channel.data.calls_started, k_success + k_failed) + self.assertEqual(resp.channel.data.calls_succeeded, k_success) + self.assertEqual(resp.channel.data.calls_failed, k_failed) + + # The fourth channel saw nothing + resp = self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(3))) + self.assertEqual(resp.channel.data.calls_started, 0) + self.assertEqual(resp.channel.data.calls_succeeded, 0) + self.assertEqual(resp.channel.data.calls_failed, 0) + + def test_many_subchannels(self): + k_channels = 4 + self._pairs = _generate_channel_server_pairs(k_channels) + k_success = 17 + k_failed = 19 + for i in range(k_success): + self._send_successful_unary_unary(0) + self._send_successful_unary_unary(2) + for i in range(k_failed): + self._send_failed_unary_unary(1) + self._send_failed_unary_unary(2) + + gtc_resp = self._channelz_stub.GetTopChannels( + channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) + self.assertEqual(len(gtc_resp.channel), k_channels) + for i in range(k_channels): + # If no call performed in the channel, there shouldn't be any subchannel + if gtc_resp.channel[i].data.calls_started == 0: + self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0) + continue + + # Otherwise, the subchannel should exist + self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0) + gsc_resp = self._channelz_stub.GetSubchannel( + channelz_pb2.GetSubchannelRequest( + subchannel_id=gtc_resp.channel[i].subchannel_ref[ + 0].subchannel_id)) + self.assertEqual(gtc_resp.channel[i].data.calls_started, + gsc_resp.subchannel.data.calls_started) + self.assertEqual(gtc_resp.channel[i].data.calls_succeeded, + gsc_resp.subchannel.data.calls_succeeded) + self.assertEqual(gtc_resp.channel[i].data.calls_failed, + gsc_resp.subchannel.data.calls_failed) + + @unittest.skip('Servers in core are not guaranteed to be destroyed ' \ + 'immediately when the reference goes out of scope, so ' \ + 'servers from multiple test cases are not hermetic. ' \ + 'TODO(https://github.com/grpc/grpc/issues/17258)') + def test_server_basic(self): + self._pairs = _generate_channel_server_pairs(1) + resp = self._channelz_stub.GetServers( + channelz_pb2.GetServersRequest(start_server_id=0)) + self.assertEqual(len(resp.server), 1) + + @unittest.skip('Servers in core are not guaranteed to be destroyed ' \ + 'immediately when the reference goes out of scope, so ' \ + 'servers from multiple test cases are not hermetic. ' \ + 'TODO(https://github.com/grpc/grpc/issues/17258)') + def test_get_one_server(self): + self._pairs = _generate_channel_server_pairs(1) + gss_resp = self._channelz_stub.GetServers( + channelz_pb2.GetServersRequest(start_server_id=0)) + self.assertEqual(len(gss_resp.server), 1) + gs_resp = self._channelz_stub.GetServer( + channelz_pb2.GetServerRequest( + server_id=gss_resp.server[0].ref.server_id)) + self.assertEqual(gss_resp.server[0].ref.server_id, + gs_resp.server.ref.server_id) + + @unittest.skip('Servers in core are not guaranteed to be destroyed ' \ + 'immediately when the reference goes out of scope, so ' \ + 'servers from multiple test cases are not hermetic. ' \ + 'TODO(https://github.com/grpc/grpc/issues/17258)') + def test_server_call(self): + self._pairs = _generate_channel_server_pairs(1) + k_success = 23 + k_failed = 29 + for i in range(k_success): + self._send_successful_unary_unary(0) + for i in range(k_failed): + self._send_failed_unary_unary(0) + + resp = self._channelz_stub.GetServers( + channelz_pb2.GetServersRequest(start_server_id=0)) + self.assertEqual(len(resp.server), 1) + self.assertEqual(resp.server[0].data.calls_started, + k_success + k_failed) + self.assertEqual(resp.server[0].data.calls_succeeded, k_success) + self.assertEqual(resp.server[0].data.calls_failed, k_failed) + + def test_many_subchannels_and_sockets(self): + k_channels = 4 + self._pairs = _generate_channel_server_pairs(k_channels) + k_success = 3 + k_failed = 5 + for i in range(k_success): + self._send_successful_unary_unary(0) + self._send_successful_unary_unary(2) + for i in range(k_failed): + self._send_failed_unary_unary(1) + self._send_failed_unary_unary(2) + + gtc_resp = self._channelz_stub.GetTopChannels( + channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) + self.assertEqual(len(gtc_resp.channel), k_channels) + for i in range(k_channels): + # If no call performed in the channel, there shouldn't be any subchannel + if gtc_resp.channel[i].data.calls_started == 0: + self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0) + continue + + # Otherwise, the subchannel should exist + self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0) + gsc_resp = self._channelz_stub.GetSubchannel( + channelz_pb2.GetSubchannelRequest( + subchannel_id=gtc_resp.channel[i].subchannel_ref[ + 0].subchannel_id)) + self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1) + + gs_resp = self._channelz_stub.GetSocket( + channelz_pb2.GetSocketRequest( + socket_id=gsc_resp.subchannel.socket_ref[0].socket_id)) + self.assertEqual(gsc_resp.subchannel.data.calls_started, + gs_resp.socket.data.streams_started) + self.assertEqual(gsc_resp.subchannel.data.calls_started, + gs_resp.socket.data.streams_succeeded) + # Calls started == messages sent, only valid for unary calls + self.assertEqual(gsc_resp.subchannel.data.calls_started, + gs_resp.socket.data.messages_sent) + # Only receive responses when the RPC was successful + self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, + gs_resp.socket.data.messages_received) + + def test_streaming_rpc(self): + self._pairs = _generate_channel_server_pairs(1) + # In C++, the argument for _send_successful_stream_stream is message length. + # Here the argument is still channel idx, to be consistent with the other two. + self._send_successful_stream_stream(0) + + gc_resp = self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))) + self.assertEqual(gc_resp.channel.data.calls_started, 1) + self.assertEqual(gc_resp.channel.data.calls_succeeded, 1) + self.assertEqual(gc_resp.channel.data.calls_failed, 0) + # Subchannel exists + self.assertGreater(len(gc_resp.channel.subchannel_ref), 0) + + gsc_resp = self._channelz_stub.GetSubchannel( + channelz_pb2.GetSubchannelRequest( + subchannel_id=gc_resp.channel.subchannel_ref[0].subchannel_id)) + self.assertEqual(gsc_resp.subchannel.data.calls_started, 1) + self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, 1) + self.assertEqual(gsc_resp.subchannel.data.calls_failed, 0) + # Socket exists + self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1) + + gs_resp = self._channelz_stub.GetSocket( + channelz_pb2.GetSocketRequest( + socket_id=gsc_resp.subchannel.socket_ref[0].socket_id)) + self.assertEqual(gs_resp.socket.data.streams_started, 1) + self.assertEqual(gs_resp.socket.data.streams_succeeded, 1) + self.assertEqual(gs_resp.socket.data.streams_failed, 0) + self.assertEqual(gs_resp.socket.data.messages_sent, + test_constants.STREAM_LENGTH) + self.assertEqual(gs_resp.socket.data.messages_received, + test_constants.STREAM_LENGTH) + + @unittest.skip('Servers in core are not guaranteed to be destroyed ' \ + 'immediately when the reference goes out of scope, so ' \ + 'servers from multiple test cases are not hermetic. ' \ + 'TODO(https://github.com/grpc/grpc/issues/17258)') + def test_server_sockets(self): + self._pairs = _generate_channel_server_pairs(1) + self._send_successful_unary_unary(0) + self._send_failed_unary_unary(0) + + gs_resp = self._channelz_stub.GetServers( + channelz_pb2.GetServersRequest(start_server_id=0)) + self.assertEqual(len(gs_resp.server), 1) + self.assertEqual(gs_resp.server[0].data.calls_started, 2) + self.assertEqual(gs_resp.server[0].data.calls_succeeded, 1) + self.assertEqual(gs_resp.server[0].data.calls_failed, 1) + + gss_resp = self._channelz_stub.GetServerSockets( + channelz_pb2.GetServerSocketsRequest( + server_id=gs_resp.server[0].ref.server_id, start_socket_id=0)) + # If the RPC call failed, it will raise a grpc.RpcError + # So, if there is no exception raised, considered pass + + @unittest.skip('Servers in core are not guaranteed to be destroyed ' \ + 'immediately when the reference goes out of scope, so ' \ + 'servers from multiple test cases are not hermetic. ' \ + 'TODO(https://github.com/grpc/grpc/issues/17258)') + def test_server_listen_sockets(self): + self._pairs = _generate_channel_server_pairs(1) + + gss_resp = self._channelz_stub.GetServers( + channelz_pb2.GetServersRequest(start_server_id=0)) + self.assertEqual(len(gss_resp.server), 1) + self.assertEqual(len(gss_resp.server[0].listen_socket), 1) + + gs_resp = self._channelz_stub.GetSocket( + channelz_pb2.GetSocketRequest( + socket_id=gss_resp.server[0].listen_socket[0].socket_id)) + # If the RPC call failed, it will raise a grpc.RpcError + # So, if there is no exception raised, considered pass + + def test_invalid_query_get_server(self): + try: + self._channelz_stub.GetServer( + channelz_pb2.GetServerRequest(server_id=10000)) + except BaseException as e: + self.assertIn('StatusCode.NOT_FOUND', str(e)) + else: + self.fail('Invalid query not detected') + + def test_invalid_query_get_channel(self): + try: + self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=10000)) + except BaseException as e: + self.assertIn('StatusCode.NOT_FOUND', str(e)) + else: + self.fail('Invalid query not detected') + + def test_invalid_query_get_subchannel(self): + try: + self._channelz_stub.GetSubchannel( + channelz_pb2.GetSubchannelRequest(subchannel_id=10000)) + except BaseException as e: + self.assertIn('StatusCode.NOT_FOUND', str(e)) + else: + self.fail('Invalid query not detected') + + def test_invalid_query_get_socket(self): + try: + self._channelz_stub.GetSocket( + channelz_pb2.GetSocketRequest(socket_id=10000)) + except BaseException as e: + self.assertIn('StatusCode.NOT_FOUND', str(e)) + else: + self.fail('Invalid query not detected') + + def test_invalid_query_get_server_sockets(self): + try: + self._channelz_stub.GetServerSockets( + channelz_pb2.GetServerSocketsRequest( + server_id=10000, + start_socket_id=0, + )) + except BaseException as e: + self.assertIn('StatusCode.NOT_FOUND', str(e)) + else: + self.fail('Invalid query not detected') + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index a3006d9afc..9cffd3df19 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -1,5 +1,6 @@ [ "_sanity._sanity_test.SanityTest", + "channelz._channelz_servicer_test.ChannelzServicerTest", "health_check._health_servicer_test.HealthServicerTest", "interop._insecure_intraop_test.InsecureIntraopTest", "interop._secure_intraop_test.SecureIntraopTest", diff --git a/src/ruby/pb/grpc/health/checker.rb b/src/ruby/pb/grpc/health/checker.rb index c492455d8f..7ad68409dd 100644 --- a/src/ruby/pb/grpc/health/checker.rb +++ b/src/ruby/pb/grpc/health/checker.rb @@ -14,7 +14,6 @@ require 'grpc' require 'grpc/health/v1/health_services_pb' -require 'thread' module Grpc # Health contains classes and modules that support providing a health check @@ -37,9 +36,9 @@ module Grpc @status_mutex.synchronize do status = @statuses["#{req.service}"] end - if status.nil? + if status.nil? fail GRPC::BadStatus.new_status_exception(StatusCodes::NOT_FOUND) - end + end HealthCheckResponse.new(status: status) end |