diff options
author | Yash Tibrewal <yashkt@google.com> | 2018-12-26 15:10:41 -0800 |
---|---|---|
committer | Yash Tibrewal <yashkt@google.com> | 2018-12-26 15:10:41 -0800 |
commit | 24e37e249a4db24ff2c886960e3a00311e2591dd (patch) | |
tree | 7bcb4a002432e7ef04054750b1efe8dfd9ba8ade /src/cpp | |
parent | 0911e489e3fe22e2ca5d7c927dac83358f2f05b7 (diff) | |
parent | fc7d0911a3a44d7bc926d3db99b7300a0c0f33dc (diff) |
Merge branch 'master' into failhijackedrecv
Diffstat (limited to 'src/cpp')
24 files changed, 200 insertions, 123 deletions
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 8e1cea0269..a31d0b30b1 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -54,13 +54,11 @@ namespace grpc { static internal::GrpcLibraryInitializer g_gli_initializer; Channel::Channel( const grpc::string& host, grpc_channel* channel, - std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>> + std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators) : host_(host), c_channel_(channel) { - if (interceptor_creators != nullptr) { - interceptor_creators_ = std::move(*interceptor_creators); - } + interceptor_creators_ = std::move(interceptor_creators); g_gli_initializer.summon(); } @@ -151,8 +149,9 @@ internal::Call Channel::CreateCallInternal(const internal::RpcMethod& method, // ClientRpcInfo should be set before call because set_call also checks // whether the call has been cancelled, and if the call was cancelled, we // should notify the interceptors too/ - auto* info = context->set_client_rpc_info( - method.name(), this, interceptor_creators_, interceptor_pos); + auto* info = + context->set_client_rpc_info(method.name(), method.method_type(), this, + interceptor_creators_, interceptor_pos); context->set_call(c_call, shared_from_this()); return internal::Call(c_call, this, cq, info); diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index 50da75f09c..c9ea3e5f83 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -41,9 +41,10 @@ class DefaultGlobalClientCallbacks final }; static internal::GrpcLibraryInitializer g_gli_initializer; -static DefaultGlobalClientCallbacks g_default_client_callbacks; +static DefaultGlobalClientCallbacks* g_default_client_callbacks = + new DefaultGlobalClientCallbacks(); static ClientContext::GlobalCallbacks* g_client_callbacks = - &g_default_client_callbacks; + g_default_client_callbacks; ClientContext::ClientContext() : initial_metadata_received_(false), @@ -139,9 +140,9 @@ grpc::string ClientContext::peer() const { } void ClientContext::SetGlobalCallbacks(GlobalCallbacks* client_callbacks) { - GPR_ASSERT(g_client_callbacks == &g_default_client_callbacks); + GPR_ASSERT(g_client_callbacks == g_default_client_callbacks); GPR_ASSERT(client_callbacks != nullptr); - GPR_ASSERT(client_callbacks != &g_default_client_callbacks); + GPR_ASSERT(client_callbacks != g_default_client_callbacks); g_client_callbacks = client_callbacks; } diff --git a/src/cpp/client/create_channel.cc b/src/cpp/client/create_channel.cc index efdff6c265..457daa674c 100644 --- a/src/cpp/client/create_channel.cc +++ b/src/cpp/client/create_channel.cc @@ -39,13 +39,14 @@ std::shared_ptr<Channel> CreateCustomChannel( const std::shared_ptr<ChannelCredentials>& creds, const ChannelArguments& args) { GrpcLibraryCodegen init_lib; // We need to call init in case of a bad creds. - return creds - ? creds->CreateChannel(target, args) - : CreateChannelInternal("", - grpc_lame_client_channel_create( - nullptr, GRPC_STATUS_INVALID_ARGUMENT, - "Invalid credentials."), - nullptr); + return creds ? creds->CreateChannel(target, args) + : CreateChannelInternal( + "", + grpc_lame_client_channel_create( + nullptr, GRPC_STATUS_INVALID_ARGUMENT, + "Invalid credentials."), + std::vector<std::unique_ptr< + experimental::ClientInterceptorFactoryInterface>>()); } namespace experimental { @@ -64,17 +65,18 @@ std::shared_ptr<Channel> CreateCustomChannelWithInterceptors( const grpc::string& target, const std::shared_ptr<ChannelCredentials>& creds, const ChannelArguments& args, - std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>> + std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators) { - return creds - ? creds->CreateChannelWithInterceptors( - target, args, std::move(interceptor_creators)) - : CreateChannelInternal("", - grpc_lame_client_channel_create( - nullptr, GRPC_STATUS_INVALID_ARGUMENT, - "Invalid credentials."), - nullptr); + return creds ? creds->CreateChannelWithInterceptors( + target, args, std::move(interceptor_creators)) + : CreateChannelInternal( + "", + grpc_lame_client_channel_create( + nullptr, GRPC_STATUS_INVALID_ARGUMENT, + "Invalid credentials."), + std::vector<std::unique_ptr< + experimental::ClientInterceptorFactoryInterface>>()); } } // namespace experimental diff --git a/src/cpp/client/create_channel_internal.cc b/src/cpp/client/create_channel_internal.cc index 313d682aae..a0efb97f7e 100644 --- a/src/cpp/client/create_channel_internal.cc +++ b/src/cpp/client/create_channel_internal.cc @@ -26,8 +26,8 @@ namespace grpc { std::shared_ptr<Channel> CreateChannelInternal( const grpc::string& host, grpc_channel* c_channel, - std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>> + std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators) { return std::shared_ptr<Channel>( new Channel(host, c_channel, std::move(interceptor_creators))); diff --git a/src/cpp/client/create_channel_internal.h b/src/cpp/client/create_channel_internal.h index 512fc22866..a90c92c518 100644 --- a/src/cpp/client/create_channel_internal.h +++ b/src/cpp/client/create_channel_internal.h @@ -31,8 +31,8 @@ class Channel; std::shared_ptr<Channel> CreateChannelInternal( const grpc::string& host, grpc_channel* c_channel, - std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>> + std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators); } // namespace grpc diff --git a/src/cpp/client/create_channel_posix.cc b/src/cpp/client/create_channel_posix.cc index 8d775e7a87..3affc1ef39 100644 --- a/src/cpp/client/create_channel_posix.cc +++ b/src/cpp/client/create_channel_posix.cc @@ -34,7 +34,8 @@ std::shared_ptr<Channel> CreateInsecureChannelFromFd(const grpc::string& target, init_lib.init(); return CreateChannelInternal( "", grpc_insecure_channel_create_from_fd(target.c_str(), fd, nullptr), - nullptr); + std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); } std::shared_ptr<Channel> CreateCustomInsecureChannelFromFd( @@ -46,15 +47,16 @@ std::shared_ptr<Channel> CreateCustomInsecureChannelFromFd( return CreateChannelInternal( "", grpc_insecure_channel_create_from_fd(target.c_str(), fd, &channel_args), - nullptr); + std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); } namespace experimental { std::shared_ptr<Channel> CreateCustomInsecureChannelWithInterceptorsFromFd( const grpc::string& target, int fd, const ChannelArguments& args, - std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>> + std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators) { internal::GrpcLibrary init_lib; init_lib.init(); diff --git a/src/cpp/client/cronet_credentials.cc b/src/cpp/client/cronet_credentials.cc index 09a76b428c..b2801764f2 100644 --- a/src/cpp/client/cronet_credentials.cc +++ b/src/cpp/client/cronet_credentials.cc @@ -31,7 +31,10 @@ class CronetChannelCredentialsImpl final : public ChannelCredentials { std::shared_ptr<grpc::Channel> CreateChannel( const string& target, const grpc::ChannelArguments& args) override { - return CreateChannelWithInterceptors(target, args, nullptr); + return CreateChannelWithInterceptors( + target, args, + std::vector<std::unique_ptr< + experimental::ClientInterceptorFactoryInterface>>()); } SecureChannelCredentials* AsSecureCredentials() override { return nullptr; } @@ -39,8 +42,8 @@ class CronetChannelCredentialsImpl final : public ChannelCredentials { private: std::shared_ptr<grpc::Channel> CreateChannelWithInterceptors( const string& target, const grpc::ChannelArguments& args, - std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>> + std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators) override { grpc_channel_args channel_args; args.SetChannelArgs(&channel_args); diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index 87902b26f0..f61c1b5317 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -72,4 +72,13 @@ void GenericStub::experimental_type::UnaryCall( context, request, response, std::move(on_completion)); } +void GenericStub::experimental_type::PrepareBidiStreamingCall( + ClientContext* context, const grpc::string& method, + experimental::ClientBidiReactor<ByteBuffer, ByteBuffer>* reactor) { + internal::ClientCallbackReaderWriterFactory<ByteBuffer, ByteBuffer>::Create( + stub_->channel_.get(), + internal::RpcMethod(method.c_str(), internal::RpcMethod::BIDI_STREAMING), + context, reactor); +} + } // namespace grpc diff --git a/src/cpp/client/insecure_credentials.cc b/src/cpp/client/insecure_credentials.cc index b816e0c59a..241ce91803 100644 --- a/src/cpp/client/insecure_credentials.cc +++ b/src/cpp/client/insecure_credentials.cc @@ -32,13 +32,16 @@ class InsecureChannelCredentialsImpl final : public ChannelCredentials { public: std::shared_ptr<grpc::Channel> CreateChannel( const string& target, const grpc::ChannelArguments& args) override { - return CreateChannelWithInterceptors(target, args, nullptr); + return CreateChannelWithInterceptors( + target, args, + std::vector<std::unique_ptr< + experimental::ClientInterceptorFactoryInterface>>()); } std::shared_ptr<grpc::Channel> CreateChannelWithInterceptors( const string& target, const grpc::ChannelArguments& args, - std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>> + std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators) override { grpc_channel_args channel_args; args.SetChannelArgs(&channel_args); diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 7faaa20e78..4d0ed355ab 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -36,14 +36,17 @@ SecureChannelCredentials::SecureChannelCredentials( std::shared_ptr<grpc::Channel> SecureChannelCredentials::CreateChannel( const string& target, const grpc::ChannelArguments& args) { - return CreateChannelWithInterceptors(target, args, nullptr); + return CreateChannelWithInterceptors( + target, args, + std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); } std::shared_ptr<grpc::Channel> SecureChannelCredentials::CreateChannelWithInterceptors( const string& target, const grpc::ChannelArguments& args, - std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>> + std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators) { grpc_channel_args channel_args; args.SetChannelArgs(&channel_args); @@ -258,10 +261,10 @@ void MetadataCredentialsPluginWrapper::InvokePlugin( grpc_status_code* status_code, const char** error_details) { std::multimap<grpc::string, grpc::string> metadata; - // const_cast is safe since the SecureAuthContext does not take owndership and - // the object is passed as a const ref to plugin_->GetMetadata. + // const_cast is safe since the SecureAuthContext only inc/dec the refcount + // and the object is passed as a const ref to plugin_->GetMetadata. SecureAuthContext cpp_channel_auth_context( - const_cast<grpc_auth_context*>(context.channel_auth_context), false); + const_cast<grpc_auth_context*>(context.channel_auth_context)); Status status = plugin_->GetMetadata(context.service_url, context.method_name, cpp_channel_auth_context, &metadata); diff --git a/src/cpp/client/secure_credentials.h b/src/cpp/client/secure_credentials.h index bfb6e17ee9..4918bd5a4d 100644 --- a/src/cpp/client/secure_credentials.h +++ b/src/cpp/client/secure_credentials.h @@ -24,6 +24,7 @@ #include <grpcpp/security/credentials.h> #include <grpcpp/support/config.h> +#include "src/core/lib/security/credentials/credentials.h" #include "src/cpp/server/thread_pool_interface.h" namespace grpc { @@ -31,7 +32,9 @@ namespace grpc { class SecureChannelCredentials final : public ChannelCredentials { public: explicit SecureChannelCredentials(grpc_channel_credentials* c_creds); - ~SecureChannelCredentials() { grpc_channel_credentials_release(c_creds_); } + ~SecureChannelCredentials() { + if (c_creds_ != nullptr) c_creds_->Unref(); + } grpc_channel_credentials* GetRawCreds() { return c_creds_; } std::shared_ptr<grpc::Channel> CreateChannel( @@ -42,8 +45,8 @@ class SecureChannelCredentials final : public ChannelCredentials { private: std::shared_ptr<grpc::Channel> CreateChannelWithInterceptors( const string& target, const grpc::ChannelArguments& args, - std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>> + std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators) override; grpc_channel_credentials* const c_creds_; }; @@ -51,7 +54,9 @@ class SecureChannelCredentials final : public ChannelCredentials { class SecureCallCredentials final : public CallCredentials { public: explicit SecureCallCredentials(grpc_call_credentials* c_creds); - ~SecureCallCredentials() { grpc_call_credentials_release(c_creds_); } + ~SecureCallCredentials() { + if (c_creds_ != nullptr) c_creds_->Unref(); + } grpc_call_credentials* GetRawCreds() { return c_creds_; } bool ApplyToCall(grpc_call* call) override; diff --git a/src/cpp/common/alarm.cc b/src/cpp/common/alarm.cc index 5819a4210b..148f0b9bc9 100644 --- a/src/cpp/common/alarm.cc +++ b/src/cpp/common/alarm.cc @@ -31,10 +31,10 @@ #include <grpc/support/log.h> #include "src/core/lib/debug/trace.h" -namespace grpc { +namespace grpc_impl { namespace internal { -class AlarmImpl : public CompletionQueueTag { +class AlarmImpl : public ::grpc::internal::CompletionQueueTag { public: AlarmImpl() : cq_(nullptr), tag_(nullptr) { gpr_ref_init(&refs_, 1); @@ -51,7 +51,7 @@ class AlarmImpl : public CompletionQueueTag { Unref(); return true; } - void Set(CompletionQueue* cq, gpr_timespec deadline, void* tag) { + void Set(::grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) { grpc_core::ExecCtx exec_ctx; GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm"); cq_ = cq->cq(); @@ -114,13 +114,14 @@ class AlarmImpl : public CompletionQueueTag { }; } // namespace internal -static internal::GrpcLibraryInitializer g_gli_initializer; +static ::grpc::internal::GrpcLibraryInitializer g_gli_initializer; Alarm::Alarm() : alarm_(new internal::AlarmImpl()) { g_gli_initializer.summon(); } -void Alarm::SetInternal(CompletionQueue* cq, gpr_timespec deadline, void* tag) { +void Alarm::SetInternal(::grpc::CompletionQueue* cq, gpr_timespec deadline, + void* tag) { // Note that we know that alarm_ is actually an internal::AlarmImpl // but we declared it as the base pointer to avoid a forward declaration // or exposing core data structures in the C++ public headers. @@ -145,4 +146,4 @@ Alarm::~Alarm() { } void Alarm::Cancel() { static_cast<internal::AlarmImpl*>(alarm_)->Cancel(); } -} // namespace grpc +} // namespace grpc_impl diff --git a/src/cpp/common/channel_arguments.cc b/src/cpp/common/channel_arguments.cc index 50ee9d871f..214d72f853 100644 --- a/src/cpp/common/channel_arguments.cc +++ b/src/cpp/common/channel_arguments.cc @@ -106,7 +106,9 @@ void ChannelArguments::SetSocketMutator(grpc_socket_mutator* mutator) { } if (!replaced) { + strings_.push_back(grpc::string(mutator_arg.key)); args_.push_back(mutator_arg); + args_.back().key = const_cast<char*>(strings_.back().c_str()); } } diff --git a/src/cpp/common/secure_auth_context.cc b/src/cpp/common/secure_auth_context.cc index 1d66dd3d1f..7a2b5afed6 100644 --- a/src/cpp/common/secure_auth_context.cc +++ b/src/cpp/common/secure_auth_context.cc @@ -22,19 +22,12 @@ namespace grpc { -SecureAuthContext::SecureAuthContext(grpc_auth_context* ctx, - bool take_ownership) - : ctx_(ctx), take_ownership_(take_ownership) {} - -SecureAuthContext::~SecureAuthContext() { - if (take_ownership_) grpc_auth_context_release(ctx_); -} - std::vector<grpc::string_ref> SecureAuthContext::GetPeerIdentity() const { - if (!ctx_) { + if (ctx_ == nullptr) { return std::vector<grpc::string_ref>(); } - grpc_auth_property_iterator iter = grpc_auth_context_peer_identity(ctx_); + grpc_auth_property_iterator iter = + grpc_auth_context_peer_identity(ctx_.get()); std::vector<grpc::string_ref> identity; const grpc_auth_property* property = nullptr; while ((property = grpc_auth_property_iterator_next(&iter))) { @@ -45,20 +38,20 @@ std::vector<grpc::string_ref> SecureAuthContext::GetPeerIdentity() const { } grpc::string SecureAuthContext::GetPeerIdentityPropertyName() const { - if (!ctx_) { + if (ctx_ == nullptr) { return ""; } - const char* name = grpc_auth_context_peer_identity_property_name(ctx_); + const char* name = grpc_auth_context_peer_identity_property_name(ctx_.get()); return name == nullptr ? "" : name; } std::vector<grpc::string_ref> SecureAuthContext::FindPropertyValues( const grpc::string& name) const { - if (!ctx_) { + if (ctx_ == nullptr) { return std::vector<grpc::string_ref>(); } grpc_auth_property_iterator iter = - grpc_auth_context_find_properties_by_name(ctx_, name.c_str()); + grpc_auth_context_find_properties_by_name(ctx_.get(), name.c_str()); const grpc_auth_property* property = nullptr; std::vector<grpc::string_ref> values; while ((property = grpc_auth_property_iterator_next(&iter))) { @@ -68,9 +61,9 @@ std::vector<grpc::string_ref> SecureAuthContext::FindPropertyValues( } AuthPropertyIterator SecureAuthContext::begin() const { - if (ctx_) { + if (ctx_ != nullptr) { grpc_auth_property_iterator iter = - grpc_auth_context_property_iterator(ctx_); + grpc_auth_context_property_iterator(ctx_.get()); const grpc_auth_property* property = grpc_auth_property_iterator_next(&iter); return AuthPropertyIterator(property, &iter); @@ -85,19 +78,20 @@ AuthPropertyIterator SecureAuthContext::end() const { void SecureAuthContext::AddProperty(const grpc::string& key, const grpc::string_ref& value) { - if (!ctx_) return; - grpc_auth_context_add_property(ctx_, key.c_str(), value.data(), value.size()); + if (ctx_ == nullptr) return; + grpc_auth_context_add_property(ctx_.get(), key.c_str(), value.data(), + value.size()); } bool SecureAuthContext::SetPeerIdentityPropertyName(const grpc::string& name) { - if (!ctx_) return false; - return grpc_auth_context_set_peer_identity_property_name(ctx_, + if (ctx_ == nullptr) return false; + return grpc_auth_context_set_peer_identity_property_name(ctx_.get(), name.c_str()) != 0; } bool SecureAuthContext::IsPeerAuthenticated() const { - if (!ctx_) return false; - return grpc_auth_context_peer_is_authenticated(ctx_) != 0; + if (ctx_ == nullptr) return false; + return grpc_auth_context_peer_is_authenticated(ctx_.get()) != 0; } } // namespace grpc diff --git a/src/cpp/common/secure_auth_context.h b/src/cpp/common/secure_auth_context.h index 142617959c..2e8f793721 100644 --- a/src/cpp/common/secure_auth_context.h +++ b/src/cpp/common/secure_auth_context.h @@ -21,15 +21,17 @@ #include <grpcpp/security/auth_context.h> -struct grpc_auth_context; +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/security/context/security_context.h" namespace grpc { class SecureAuthContext final : public AuthContext { public: - SecureAuthContext(grpc_auth_context* ctx, bool take_ownership); + explicit SecureAuthContext(grpc_auth_context* ctx) + : ctx_(ctx != nullptr ? ctx->Ref() : nullptr) {} - ~SecureAuthContext() override; + ~SecureAuthContext() override = default; bool IsPeerAuthenticated() const override; @@ -50,8 +52,7 @@ class SecureAuthContext final : public AuthContext { virtual bool SetPeerIdentityPropertyName(const grpc::string& name) override; private: - grpc_auth_context* ctx_; - bool take_ownership_; + grpc_core::RefCountedPtr<grpc_auth_context> ctx_; }; } // namespace grpc diff --git a/src/cpp/common/secure_create_auth_context.cc b/src/cpp/common/secure_create_auth_context.cc index bc1387c8d7..908c46629e 100644 --- a/src/cpp/common/secure_create_auth_context.cc +++ b/src/cpp/common/secure_create_auth_context.cc @@ -20,6 +20,7 @@ #include <grpc/grpc.h> #include <grpc/grpc_security.h> #include <grpcpp/security/auth_context.h> +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/cpp/common/secure_auth_context.h" namespace grpc { @@ -28,8 +29,8 @@ std::shared_ptr<const AuthContext> CreateAuthContext(grpc_call* call) { if (call == nullptr) { return std::shared_ptr<const AuthContext>(); } - return std::shared_ptr<const AuthContext>( - new SecureAuthContext(grpc_call_auth_context(call), true)); + grpc_core::RefCountedPtr<grpc_auth_context> ctx(grpc_call_auth_context(call)); + return std::make_shared<SecureAuthContext>(ctx.get()); } } // namespace grpc diff --git a/src/cpp/common/version_cc.cc b/src/cpp/common/version_cc.cc index 8abd45efb7..55da89e6c8 100644 --- a/src/cpp/common/version_cc.cc +++ b/src/cpp/common/version_cc.cc @@ -22,5 +22,5 @@ #include <grpcpp/grpcpp.h> namespace grpc { -grpc::string Version() { return "1.17.0-dev"; } +grpc::string Version() { return "1.18.0-dev"; } } // namespace grpc diff --git a/src/cpp/ext/filters/census/context.cc b/src/cpp/ext/filters/census/context.cc index 78fc69a805..160590353a 100644 --- a/src/cpp/ext/filters/census/context.cc +++ b/src/cpp/ext/filters/census/context.cc @@ -28,6 +28,9 @@ using ::opencensus::trace::SpanContext; void GenerateServerContext(absl::string_view tracing, absl::string_view stats, absl::string_view primary_role, absl::string_view method, CensusContext* context) { + // Destruct the current CensusContext to free the Span memory before + // overwriting it below. + context->~CensusContext(); GrpcTraceContext trace_ctxt; if (TraceContextEncoding::Decode(tracing, &trace_ctxt) != TraceContextEncoding::kEncodeDecodeFailure) { @@ -42,6 +45,9 @@ void GenerateServerContext(absl::string_view tracing, absl::string_view stats, void GenerateClientContext(absl::string_view method, CensusContext* ctxt, CensusContext* parent_ctxt) { + // Destruct the current CensusContext to free the Span memory before + // overwriting it below. + ctxt->~CensusContext(); if (parent_ctxt != nullptr) { SpanContext span_ctxt = parent_ctxt->Context(); Span span = parent_ctxt->Span(); diff --git a/src/cpp/server/channelz/channelz_service.cc b/src/cpp/server/channelz/channelz_service.cc index 9ecb9de7e4..c44a9ac0de 100644 --- a/src/cpp/server/channelz/channelz_service.cc +++ b/src/cpp/server/channelz/channelz_service.cc @@ -79,8 +79,8 @@ Status ChannelzService::GetServer(ServerContext* unused, Status ChannelzService::GetServerSockets( ServerContext* unused, const channelz::v1::GetServerSocketsRequest* request, channelz::v1::GetServerSocketsResponse* response) { - char* json_str = grpc_channelz_get_server_sockets(request->server_id(), - request->start_socket_id()); + char* json_str = grpc_channelz_get_server_sockets( + request->server_id(), request->start_socket_id(), request->max_results()); if (json_str == nullptr) { return Status(StatusCode::INTERNAL, "grpc_channelz_get_server_sockets returned null"); diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index c951c69d51..44aebd2f9d 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -42,18 +42,37 @@ DefaultHealthCheckService::DefaultHealthCheckService() { void DefaultHealthCheckService::SetServingStatus( const grpc::string& service_name, bool serving) { std::unique_lock<std::mutex> lock(mu_); + if (shutdown_) { + // Set to NOT_SERVING in case service_name is not in the map. + serving = false; + } services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING); } void DefaultHealthCheckService::SetServingStatus(bool serving) { const ServingStatus status = serving ? SERVING : NOT_SERVING; std::unique_lock<std::mutex> lock(mu_); + if (shutdown_) { + return; + } for (auto& p : services_map_) { ServiceData& service_data = p.second; service_data.SetServingStatus(status); } } +void DefaultHealthCheckService::Shutdown() { + std::unique_lock<std::mutex> lock(mu_); + if (shutdown_) { + return; + } + shutdown_ = true; + for (auto& p : services_map_) { + ServiceData& service_data = p.second; + service_data.SetServingStatus(NOT_SERVING); + } +} + DefaultHealthCheckService::ServingStatus DefaultHealthCheckService::GetServingStatus( const grpc::string& service_name) const { diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index 450bd543f5..9551cd2e2c 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -237,6 +237,8 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { bool serving) override; void SetServingStatus(bool serving) override; + void Shutdown() override; + ServingStatus GetServingStatus(const grpc::string& service_name) const; HealthCheckServiceImpl* GetHealthCheckService( @@ -272,6 +274,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler); mutable std::mutex mu_; + bool shutdown_ = false; // Guarded by mu_. std::map<grpc::string, ServiceData> services_map_; // Guarded by mu_. std::unique_ptr<HealthCheckServiceImpl> impl_; }; diff --git a/src/cpp/server/secure_server_credentials.cc b/src/cpp/server/secure_server_credentials.cc index ebb17def32..453e76eb25 100644 --- a/src/cpp/server/secure_server_credentials.cc +++ b/src/cpp/server/secure_server_credentials.cc @@ -61,7 +61,7 @@ void AuthMetadataProcessorAyncWrapper::InvokeProcessor( metadata.insert(std::make_pair(StringRefFromSlice(&md[i].key), StringRefFromSlice(&md[i].value))); } - SecureAuthContext context(ctx, false); + SecureAuthContext context(ctx); AuthMetadataProcessor::OutputMetadata consumed_metadata; AuthMetadataProcessor::OutputMetadata response_metadata; diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 8b1658dd27..13741ce7aa 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -84,10 +84,7 @@ class ShutdownTag : public internal::CompletionQueueTag { class DummyTag : public internal::CompletionQueueTag { public: - bool FinalizeResult(void** tag, bool* status) { - *status = true; - return true; - } + bool FinalizeResult(void** tag, bool* status) { return true; } }; class UnimplementedAsyncRequestContext { @@ -239,9 +236,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { : nullptr), request_(nullptr), method_(mrd->method_), - call_(mrd->call_, server, &cq_, server->max_receive_message_size(), - ctx_.set_server_rpc_info(method_->name(), - server->interceptor_creators_)), + call_( + mrd->call_, server, &cq_, server->max_receive_message_size(), + ctx_.set_server_rpc_info(method_->name(), method_->method_type(), + server->interceptor_creators_)), server_(server), global_callbacks_(nullptr), resources_(false) { @@ -294,7 +292,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { void ContinueRunAfterInterception() { { - ctx_.BeginCompletionOp(&call_, false); + ctx_.BeginCompletionOp(&call_, nullptr, nullptr); global_callbacks_->PreSynchronousRequest(&ctx_); auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); @@ -430,7 +428,8 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { req_->call_, req_->server_, req_->cq_, req_->server_->max_receive_message_size(), req_->ctx_.set_server_rpc_info( - req_->method_->name(), req_->server_->interceptor_creators_)); + req_->method_->name(), req_->method_->method_type(), + req_->server_->interceptor_creators_)); req_->interceptor_methods_.SetCall(call_); req_->interceptor_methods_.SetReverse(); @@ -459,7 +458,6 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { } } void ContinueRunAfterInterception() { - req_->ctx_.BeginCompletionOp(call_, true); req_->method_->handler()->RunHandler( internal::MethodHandler::HandlerParameter( call_, &req_->ctx_, req_->request_, req_->request_status_, @@ -732,14 +730,15 @@ std::shared_ptr<Channel> Server::InProcessChannel( grpc_channel_args channel_args = args.c_channel_args(); return CreateChannelInternal( "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr), - nullptr); + std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); } std::shared_ptr<Channel> Server::experimental_type::InProcessChannelWithInterceptors( const ChannelArguments& args, - std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>> + std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators) { grpc_channel_args channel_args = args.c_channel_args(); return CreateChannelInternal( @@ -1020,7 +1019,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, } } if (*status && call_) { - context_->BeginCompletionOp(&call_wrapper_, false); + context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); } *tag = tag_; if (delete_on_finalize_) { @@ -1031,7 +1030,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, void ServerInterface::BaseAsyncRequest:: ContinueFinalizeResultAfterInterception() { - context_->BeginCompletionOp(&call_wrapper_, false); + context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); // Queue a tag which will be returned immediately grpc_core::ExecCtx exec_ctx; grpc_cq_begin_op(notification_cq_->cq(), this); @@ -1044,10 +1043,12 @@ void ServerInterface::BaseAsyncRequest:: ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( ServerInterface* server, ServerContext* context, internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, const char* name) + ServerCompletionQueue* notification_cq, void* tag, const char* name, + internal::RpcMethod::RpcType type) : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, true), - name_(name) {} + name_(name), + type_(type) {} void ServerInterface::RegisteredAsyncRequest::IssueRequest( void* registered_method, grpc_byte_buffer** payload, @@ -1094,6 +1095,7 @@ bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, call_, server_, call_cq_, server_->max_receive_message_size(), context_->set_server_rpc_info( static_cast<GenericServerContext*>(context_)->method_.c_str(), + internal::RpcMethod::BIDI_STREAMING, *server_->interceptor_creators())); return BaseAsyncRequest::FinalizeResult(tag, status); } diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 396996e5bc..1b524bc3e8 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -17,6 +17,7 @@ */ #include <grpcpp/server_context.h> +#include <grpcpp/support/server_callback.h> #include <algorithm> #include <mutex> @@ -41,8 +42,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { public: // initial refs: one in the server context, one in the cq // must ref the call before calling constructor and after deleting this - CompletionOp(internal::Call* call) + CompletionOp(internal::Call* call, internal::ServerReactor* reactor) : call_(*call), + reactor_(reactor), has_tag_(false), tag_(nullptr), core_cq_tag_(this), @@ -124,9 +126,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { return; } /* Start a dummy op so that we can return the tag */ - GPR_CODEGEN_ASSERT(GRPC_CALL_OK == - g_core_codegen_interface->grpc_call_start_batch( - call_.call(), nullptr, 0, this, nullptr)); + GPR_CODEGEN_ASSERT( + GRPC_CALL_OK == + grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, nullptr)); } private: @@ -136,13 +138,14 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { } internal::Call call_; + internal::ServerReactor* reactor_; bool has_tag_; void* tag_; void* core_cq_tag_; std::mutex mu_; int refs_; bool finalized_; - int cancelled_; + int cancelled_; // This is an int (not bool) because it is passed to core bool done_intercepting_; internal::InterceptorBatchMethodsImpl interceptor_methods_; }; @@ -190,7 +193,16 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { } finalized_ = true; - if (!*status) cancelled_ = 1; + // If for some reason the incoming status is false, mark that as a + // cancellation. + // TODO(vjpai): does this ever happen? + if (!*status) { + cancelled_ = 1; + } + + if (cancelled_ && (reactor_ != nullptr)) { + reactor_->OnCancel(); + } /* Release the lock since we are going to be running through interceptors now */ lock.unlock(); @@ -247,21 +259,29 @@ void ServerContext::BindDeadlineAndMetadata(gpr_timespec deadline, ServerContext::~ServerContext() { Clear(); } void ServerContext::Clear() { - if (call_) { - grpc_call_unref(call_); - } + auth_context_.reset(); + initial_metadata_.clear(); + trailing_metadata_.clear(); + client_metadata_.Reset(); if (completion_op_) { completion_op_->Unref(); + completion_op_ = nullptr; completion_tag_.Clear(); } if (rpc_info_) { rpc_info_->Unref(); + rpc_info_ = nullptr; + } + if (call_) { + auto* call = call_; + call_ = nullptr; + grpc_call_unref(call); } - // Don't need to clear out call_, completion_op_, or rpc_info_ because this is - // either called from destructor or just before Setup } -void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) { +void ServerContext::BeginCompletionOp(internal::Call* call, + std::function<void(bool)> callback, + internal::ServerReactor* reactor) { GPR_ASSERT(!completion_op_); if (rpc_info_) { rpc_info_->Ref(); @@ -269,10 +289,11 @@ void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) { grpc_call_ref(call->call()); completion_op_ = new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) - CompletionOp(call); - if (callback) { - completion_tag_.Set(call->call(), nullptr, completion_op_); + CompletionOp(call, reactor); + if (callback != nullptr) { + completion_tag_.Set(call->call(), std::move(callback), completion_op_); completion_op_->set_core_cq_tag(&completion_tag_); + completion_op_->set_tag(completion_op_); } else if (has_notify_when_done_tag_) { completion_op_->set_tag(async_notify_when_done_tag_); } |