diff options
Diffstat (limited to 'include')
-rw-r--r-- | include/grpc/grpc.h | 3 | ||||
-rw-r--r-- | include/grpc/impl/codegen/port_platform.h | 9 | ||||
-rw-r--r-- | include/grpcpp/health_check_service_interface.h | 4 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/channel_interface.h | 1 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/client_callback.h | 32 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/client_context.h | 6 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/client_interceptor.h | 48 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/server_context.h | 4 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/server_interceptor.h | 25 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/server_interface.h | 18 |
10 files changed, 118 insertions, 32 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index d3b74cabab..fec7f5269e 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -511,7 +511,8 @@ GRPCAPI char* grpc_channelz_get_server(intptr_t server_id); /* Gets all server sockets that exist in the server. */ GRPCAPI char* grpc_channelz_get_server_sockets(intptr_t server_id, - intptr_t start_socket_id); + intptr_t start_socket_id, + intptr_t max_results); /* Returns a single Channel, or else a NOT_FOUND code. The returned string is allocated and must be freed by the application. */ diff --git a/include/grpc/impl/codegen/port_platform.h b/include/grpc/impl/codegen/port_platform.h index b2028a6305..031c0c36ae 100644 --- a/include/grpc/impl/codegen/port_platform.h +++ b/include/grpc/impl/codegen/port_platform.h @@ -526,6 +526,15 @@ typedef unsigned __int64 uint64_t; #endif /* GPR_ATTRIBUTE_NO_TSAN (2) */ #endif /* GPR_ATTRIBUTE_NO_TSAN (1) */ +/* GRPC_TSAN_ENABLED will be defined, when compiled with thread sanitizer. */ +#if defined(__SANITIZE_THREAD__) +#define GRPC_TSAN_ENABLED +#elif defined(__has_feature) +#if __has_feature(thread_sanitizer) +#define GRPC_TSAN_ENABLED +#endif +#endif + /* GRPC_ALLOW_EXCEPTIONS should be 0 or 1 if exceptions are allowed or not */ #ifndef GRPC_ALLOW_EXCEPTIONS /* If not already set, set to 1 on Windows (style guide standard) but to diff --git a/include/grpcpp/health_check_service_interface.h b/include/grpcpp/health_check_service_interface.h index b45a699bda..dfd4c3983a 100644 --- a/include/grpcpp/health_check_service_interface.h +++ b/include/grpcpp/health_check_service_interface.h @@ -37,6 +37,10 @@ class HealthCheckServiceInterface { bool serving) = 0; /// Apply to all registered service names. virtual void SetServingStatus(bool serving) = 0; + + /// Set all registered service names to not serving and prevent future + /// state changes. + virtual void Shutdown() {} }; /// Enable/disable the default health checking service. This applies to all C++ diff --git a/include/grpcpp/impl/codegen/channel_interface.h b/include/grpcpp/impl/codegen/channel_interface.h index 728a7b9049..5353f5feaa 100644 --- a/include/grpcpp/impl/codegen/channel_interface.h +++ b/include/grpcpp/impl/codegen/channel_interface.h @@ -21,7 +21,6 @@ #include <grpc/impl/codegen/connectivity_state.h> #include <grpcpp/impl/codegen/call.h> -#include <grpcpp/impl/codegen/client_context.h> #include <grpcpp/impl/codegen/status.h> #include <grpcpp/impl/codegen/time.h> diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index 4d9579fd6a..66cf9b7754 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -255,10 +255,12 @@ class ClientCallbackReaderWriterImpl void MaybeFinish() { if (--callbacks_outstanding_ == 0) { - reactor_->OnDone(finish_status_); + Status s = std::move(finish_status_); + auto* reactor = reactor_; auto* call = call_.call(); this->~ClientCallbackReaderWriterImpl(); g_core_codegen_interface->grpc_call_unref(call); + reactor->OnDone(s); } } @@ -268,6 +270,7 @@ class ClientCallbackReaderWriterImpl // 2. Any read backlog // 3. Recv trailing metadata, on_completion callback // 4. Any write backlog + // 5. See if the call can finish (if other callbacks were triggered already) started_ = true; start_tag_.Set(call_.call(), @@ -318,6 +321,7 @@ class ClientCallbackReaderWriterImpl if (writes_done_ops_at_start_) { call_.PerformOps(&writes_done_ops_); } + MaybeFinish(); } void Read(Response* msg) override { @@ -410,8 +414,8 @@ class ClientCallbackReaderWriterImpl CallbackWithSuccessTag read_tag_; bool read_ops_at_start_{false}; - // Minimum of 2 outstanding callbacks to pre-register for start and finish - std::atomic_int callbacks_outstanding_{2}; + // Minimum of 3 callbacks to pre-register for StartCall, start, and finish + std::atomic_int callbacks_outstanding_{3}; bool started_{false}; }; @@ -450,10 +454,12 @@ class ClientCallbackReaderImpl void MaybeFinish() { if (--callbacks_outstanding_ == 0) { - reactor_->OnDone(finish_status_); + Status s = std::move(finish_status_); + auto* reactor = reactor_; auto* call = call_.call(); this->~ClientCallbackReaderImpl(); g_core_codegen_interface->grpc_call_unref(call); + reactor->OnDone(s); } } @@ -462,6 +468,7 @@ class ClientCallbackReaderImpl // 1. Send initial metadata (unless corked) + recv initial metadata // 2. Any backlog // 3. Recv trailing metadata, on_completion callback + // 4. See if the call can finish (if other callbacks were triggered already) started_ = true; start_tag_.Set(call_.call(), @@ -493,6 +500,8 @@ class ClientCallbackReaderImpl finish_ops_.ClientRecvStatus(context_, &finish_status_); finish_ops_.set_core_cq_tag(&finish_tag_); call_.PerformOps(&finish_ops_); + + MaybeFinish(); } void Read(Response* msg) override { @@ -536,8 +545,8 @@ class ClientCallbackReaderImpl CallbackWithSuccessTag read_tag_; bool read_ops_at_start_{false}; - // Minimum of 2 outstanding callbacks to pre-register for start and finish - std::atomic_int callbacks_outstanding_{2}; + // Minimum of 3 callbacks to pre-register for StartCall, start, and finish + std::atomic_int callbacks_outstanding_{3}; bool started_{false}; }; @@ -576,10 +585,12 @@ class ClientCallbackWriterImpl void MaybeFinish() { if (--callbacks_outstanding_ == 0) { - reactor_->OnDone(finish_status_); + Status s = std::move(finish_status_); + auto* reactor = reactor_; auto* call = call_.call(); this->~ClientCallbackWriterImpl(); g_core_codegen_interface->grpc_call_unref(call); + reactor->OnDone(s); } } @@ -588,6 +599,7 @@ class ClientCallbackWriterImpl // 1. Send initial metadata (unless corked) + recv initial metadata // 2. Recv trailing metadata, on_completion callback // 3. Any backlog + // 4. See if the call can finish (if other callbacks were triggered already) started_ = true; start_tag_.Set(call_.call(), @@ -627,6 +639,8 @@ class ClientCallbackWriterImpl if (writes_done_ops_at_start_) { call_.PerformOps(&writes_done_ops_); } + + MaybeFinish(); } void Write(const Request* msg, WriteOptions options) override { @@ -708,8 +722,8 @@ class ClientCallbackWriterImpl CallbackWithSuccessTag writes_done_tag_; bool writes_done_ops_at_start_{false}; - // Minimum of 2 outstanding callbacks to pre-register for start and finish - std::atomic_int callbacks_outstanding_{2}; + // Minimum of 3 callbacks to pre-register for StartCall, start, and finish + std::atomic_int callbacks_outstanding_{3}; bool started_{false}; }; diff --git a/include/grpcpp/impl/codegen/client_context.h b/include/grpcpp/impl/codegen/client_context.h index 142cfa35dd..0a71f3d9b6 100644 --- a/include/grpcpp/impl/codegen/client_context.h +++ b/include/grpcpp/impl/codegen/client_context.h @@ -46,6 +46,7 @@ #include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/create_auth_context.h> #include <grpcpp/impl/codegen/metadata_map.h> +#include <grpcpp/impl/codegen/rpc_method.h> #include <grpcpp/impl/codegen/security/auth_context.h> #include <grpcpp/impl/codegen/slice.h> #include <grpcpp/impl/codegen/status.h> @@ -418,12 +419,13 @@ class ClientContext { void set_call(grpc_call* call, const std::shared_ptr<Channel>& channel); experimental::ClientRpcInfo* set_client_rpc_info( - const char* method, grpc::ChannelInterface* channel, + const char* method, internal::RpcMethod::RpcType type, + grpc::ChannelInterface* channel, const std::vector< std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>& creators, size_t interceptor_pos) { - rpc_info_ = experimental::ClientRpcInfo(this, method, channel); + rpc_info_ = experimental::ClientRpcInfo(this, type, method, channel); rpc_info_.RegisterInterceptors(creators, interceptor_pos); return &rpc_info_; } diff --git a/include/grpcpp/impl/codegen/client_interceptor.h b/include/grpcpp/impl/codegen/client_interceptor.h index f69c99ab22..2bae11a251 100644 --- a/include/grpcpp/impl/codegen/client_interceptor.h +++ b/include/grpcpp/impl/codegen/client_interceptor.h @@ -23,6 +23,7 @@ #include <vector> #include <grpcpp/impl/codegen/interceptor.h> +#include <grpcpp/impl/codegen/rpc_method.h> #include <grpcpp/impl/codegen/string_ref.h> namespace grpc { @@ -52,23 +53,56 @@ extern experimental::ClientInterceptorFactoryInterface* namespace experimental { class ClientRpcInfo { public: - ClientRpcInfo() {} + // TODO(yashykt): Stop default-constructing ClientRpcInfo and remove UNKNOWN + // from the list of possible Types. + enum class Type { + UNARY, + CLIENT_STREAMING, + SERVER_STREAMING, + BIDI_STREAMING, + UNKNOWN // UNKNOWN is not API and will be removed later + }; ~ClientRpcInfo(){}; ClientRpcInfo(const ClientRpcInfo&) = delete; ClientRpcInfo(ClientRpcInfo&&) = default; - ClientRpcInfo& operator=(ClientRpcInfo&&) = default; // Getter methods - const char* method() { return method_; } + const char* method() const { return method_; } ChannelInterface* channel() { return channel_; } grpc::ClientContext* client_context() { return ctx_; } + Type type() const { return type_; } private: - ClientRpcInfo(grpc::ClientContext* ctx, const char* method, - grpc::ChannelInterface* channel) - : ctx_(ctx), method_(method), channel_(channel) {} + static_assert(Type::UNARY == + static_cast<Type>(internal::RpcMethod::NORMAL_RPC), + "violated expectation about Type enum"); + static_assert(Type::CLIENT_STREAMING == + static_cast<Type>(internal::RpcMethod::CLIENT_STREAMING), + "violated expectation about Type enum"); + static_assert(Type::SERVER_STREAMING == + static_cast<Type>(internal::RpcMethod::SERVER_STREAMING), + "violated expectation about Type enum"); + static_assert(Type::BIDI_STREAMING == + static_cast<Type>(internal::RpcMethod::BIDI_STREAMING), + "violated expectation about Type enum"); + + // Default constructor should only be used by ClientContext + ClientRpcInfo() = default; + + // Constructor will only be called from ClientContext + ClientRpcInfo(grpc::ClientContext* ctx, internal::RpcMethod::RpcType type, + const char* method, grpc::ChannelInterface* channel) + : ctx_(ctx), + type_(static_cast<Type>(type)), + method_(method), + channel_(channel) {} + + // Move assignment should only be used by ClientContext + // TODO(yashykt): Delete move assignment + ClientRpcInfo& operator=(ClientRpcInfo&&) = default; + // Runs interceptor at pos \a pos. void RunInterceptor( experimental::InterceptorBatchMethods* interceptor_methods, size_t pos) { @@ -97,6 +131,8 @@ class ClientRpcInfo { } grpc::ClientContext* ctx_ = nullptr; + // TODO(yashykt): make type_ const once move-assignment is deleted + Type type_{Type::UNKNOWN}; const char* method_ = nullptr; grpc::ChannelInterface* channel_ = nullptr; std::vector<std::unique_ptr<experimental::Interceptor>> interceptors_; diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h index 4a5f9e2dd9..ccb5925e7d 100644 --- a/include/grpcpp/impl/codegen/server_context.h +++ b/include/grpcpp/impl/codegen/server_context.h @@ -314,12 +314,12 @@ class ServerContext { uint32_t initial_metadata_flags() const { return 0; } experimental::ServerRpcInfo* set_server_rpc_info( - const char* method, + const char* method, internal::RpcMethod::RpcType type, const std::vector< std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>& creators) { if (creators.size() != 0) { - rpc_info_ = new experimental::ServerRpcInfo(this, method); + rpc_info_ = new experimental::ServerRpcInfo(this, method, type); rpc_info_->RegisterInterceptors(creators); } return rpc_info_; diff --git a/include/grpcpp/impl/codegen/server_interceptor.h b/include/grpcpp/impl/codegen/server_interceptor.h index 5fb5df28b7..cd7c0600b6 100644 --- a/include/grpcpp/impl/codegen/server_interceptor.h +++ b/include/grpcpp/impl/codegen/server_interceptor.h @@ -23,6 +23,7 @@ #include <vector> #include <grpcpp/impl/codegen/interceptor.h> +#include <grpcpp/impl/codegen/rpc_method.h> #include <grpcpp/impl/codegen/string_ref.h> namespace grpc { @@ -44,6 +45,8 @@ class ServerInterceptorFactoryInterface { class ServerRpcInfo { public: + enum class Type { UNARY, CLIENT_STREAMING, SERVER_STREAMING, BIDI_STREAMING }; + ~ServerRpcInfo(){}; ServerRpcInfo(const ServerRpcInfo&) = delete; @@ -51,12 +54,27 @@ class ServerRpcInfo { ServerRpcInfo& operator=(ServerRpcInfo&&) = default; // Getter methods - const char* method() { return method_; } + const char* method() const { return method_; } + Type type() const { return type_; } grpc::ServerContext* server_context() { return ctx_; } private: - ServerRpcInfo(grpc::ServerContext* ctx, const char* method) - : ctx_(ctx), method_(method) { + static_assert(Type::UNARY == + static_cast<Type>(internal::RpcMethod::NORMAL_RPC), + "violated expectation about Type enum"); + static_assert(Type::CLIENT_STREAMING == + static_cast<Type>(internal::RpcMethod::CLIENT_STREAMING), + "violated expectation about Type enum"); + static_assert(Type::SERVER_STREAMING == + static_cast<Type>(internal::RpcMethod::SERVER_STREAMING), + "violated expectation about Type enum"); + static_assert(Type::BIDI_STREAMING == + static_cast<Type>(internal::RpcMethod::BIDI_STREAMING), + "violated expectation about Type enum"); + + ServerRpcInfo(grpc::ServerContext* ctx, const char* method, + internal::RpcMethod::RpcType type) + : ctx_(ctx), method_(method), type_(static_cast<Type>(type)) { ref_.store(1); } @@ -86,6 +104,7 @@ class ServerRpcInfo { grpc::ServerContext* ctx_ = nullptr; const char* method_ = nullptr; + const Type type_; std::atomic_int ref_; std::vector<std::unique_ptr<experimental::Interceptor>> interceptors_; diff --git a/include/grpcpp/impl/codegen/server_interface.h b/include/grpcpp/impl/codegen/server_interface.h index 55c94f4d2f..e0e2629827 100644 --- a/include/grpcpp/impl/codegen/server_interface.h +++ b/include/grpcpp/impl/codegen/server_interface.h @@ -174,13 +174,14 @@ class ServerInterface : public internal::CallHook { bool done_intercepting_; }; + /// RegisteredAsyncRequest is not part of the C++ API class RegisteredAsyncRequest : public BaseAsyncRequest { public: RegisteredAsyncRequest(ServerInterface* server, ServerContext* context, internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, - const char* name); + const char* name, internal::RpcMethod::RpcType type); virtual bool FinalizeResult(void** tag, bool* status) override { /* If we are done intercepting, then there is nothing more for us to do */ @@ -189,7 +190,7 @@ class ServerInterface : public internal::CallHook { } call_wrapper_ = internal::Call( call_, server_, call_cq_, server_->max_receive_message_size(), - context_->set_server_rpc_info(name_, + context_->set_server_rpc_info(name_, type_, *server_->interceptor_creators())); return BaseAsyncRequest::FinalizeResult(tag, status); } @@ -198,6 +199,7 @@ class ServerInterface : public internal::CallHook { void IssueRequest(void* registered_method, grpc_byte_buffer** payload, ServerCompletionQueue* notification_cq); const char* name_; + const internal::RpcMethod::RpcType type_; }; class NoPayloadAsyncRequest final : public RegisteredAsyncRequest { @@ -207,9 +209,9 @@ class ServerInterface : public internal::CallHook { internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) - : RegisteredAsyncRequest(server, context, stream, call_cq, - notification_cq, tag, - registered_method->name()) { + : RegisteredAsyncRequest( + server, context, stream, call_cq, notification_cq, tag, + registered_method->name(), registered_method->method_type()) { IssueRequest(registered_method->server_tag(), nullptr, notification_cq); } @@ -225,9 +227,9 @@ class ServerInterface : public internal::CallHook { CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, Message* request) - : RegisteredAsyncRequest(server, context, stream, call_cq, - notification_cq, tag, - registered_method->name()), + : RegisteredAsyncRequest( + server, context, stream, call_cq, notification_cq, tag, + registered_method->name(), registered_method->method_type()), registered_method_(registered_method), server_(server), context_(context), |