diff options
-rw-r--r-- | include/grpc/support/log.h | 1 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/completion_queue.h | 9 | ||||
-rw-r--r-- | include/grpcpp/server.h | 94 | ||||
-rw-r--r-- | include/grpcpp/server_builder.h | 52 | ||||
-rw-r--r-- | src/core/lib/gpr/log_linux.cc | 1 | ||||
-rw-r--r-- | src/core/lib/security/transport/client_auth_filter.cc | 40 | ||||
-rw-r--r-- | src/core/lib/transport/byte_stream.cc | 35 | ||||
-rw-r--r-- | src/core/lib/transport/byte_stream.h | 3 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 13 | ||||
-rw-r--r-- | test/core/end2end/cq_verifier.cc | 1 | ||||
-rw-r--r-- | test/core/end2end/tests/cancel_after_invoke.cc | 1 | ||||
-rw-r--r-- | test/core/end2end/tests/cancel_before_invoke.cc | 1 | ||||
-rw-r--r-- | test/core/end2end/tests/cancel_with_status.cc | 1 | ||||
-rw-r--r-- | test/core/end2end/tests/negative_deadline.cc | 1 | ||||
-rw-r--r-- | test/core/gpr/mpscq_test.cc | 1 | ||||
-rw-r--r-- | test/core/gpr/time_test.cc | 1 | ||||
-rw-r--r-- | test/core/util/test_config.cc | 1 | ||||
-rw-r--r-- | test/cpp/thread_manager/thread_manager_test.cc | 1 |
18 files changed, 149 insertions, 108 deletions
diff --git a/include/grpc/support/log.h b/include/grpc/support/log.h index ccb4b304cc..2236703db3 100644 --- a/include/grpc/support/log.h +++ b/include/grpc/support/log.h @@ -21,7 +21,6 @@ #include <grpc/impl/codegen/port_platform.h> -#include <inttypes.h> #include <stdarg.h> #include <stdlib.h> /* for abort() */ diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h index 80c7c41982..9713333cf5 100644 --- a/include/grpcpp/impl/codegen/completion_queue.h +++ b/include/grpcpp/impl/codegen/completion_queue.h @@ -365,9 +365,11 @@ class ServerCompletionQueue : public CompletionQueue { public: bool IsFrequentlyPolled() { return polling_type_ != GRPC_CQ_NON_LISTENING; } + protected: + /// Default constructor + ServerCompletionQueue() {} + private: - grpc_cq_polling_type polling_type_; - friend class ServerBuilder; /// \param is_frequently_polled Informs the GRPC library about whether the /// server completion queue would be actively polled (by calling Next() or /// AsyncNext()). By default all server completion queues are assumed to be @@ -376,6 +378,9 @@ class ServerCompletionQueue : public CompletionQueue { : CompletionQueue(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, polling_type}), polling_type_(polling_type) {} + + grpc_cq_polling_type polling_type_; + friend class ServerBuilder; }; } // namespace grpc diff --git a/include/grpcpp/server.h b/include/grpcpp/server.h index f99a6c26b4..e88e7966dc 100644 --- a/include/grpcpp/server.h +++ b/include/grpcpp/server.h @@ -49,7 +49,7 @@ class ServerInitializer; /// /// Use a \a grpc::ServerBuilder to create, configure, and start /// \a Server instances. -class Server final : public ServerInterface, private GrpcLibraryCodegen { +class Server : public ServerInterface, private GrpcLibraryCodegen { public: ~Server(); @@ -87,7 +87,8 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { /// application and is shared among all \a Server objects. static void SetGlobalCallbacks(GlobalCallbacks* callbacks); - // Returns a \em raw pointer to the underlying \a grpc_server instance. + /// Returns a \em raw pointer to the underlying \a grpc_server instance. + /// EXPERIMENTAL: for internal/test use only grpc_server* c_server(); /// Returns the health check service. @@ -98,24 +99,26 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { /// Establish a channel for in-process communication std::shared_ptr<Channel> InProcessChannel(const ChannelArguments& args); - private: - friend class AsyncGenericService; - friend class ServerBuilder; - friend class ServerInitializer; - - class SyncRequest; - class AsyncRequest; - class ShutdownRequest; - - /// SyncRequestThreadManager is an implementation of ThreadManager. This class - /// is responsible for polling for incoming RPCs and calling the RPC handlers. - /// This is only used in case of a Sync server (i.e a server exposing a sync - /// interface) - class SyncRequestThreadManager; + protected: + /// Register a service. This call does not take ownership of the service. + /// The service must exist for the lifetime of the Server instance. + bool RegisterService(const grpc::string* host, Service* service) override; - class UnimplementedAsyncRequestContext; - class UnimplementedAsyncRequest; - class UnimplementedAsyncResponse; + /// Try binding the server to the given \a addr endpoint + /// (port, and optionally including IP address to bind to). + /// + /// It can be invoked multiple times. Should be used before + /// starting the server. + /// + /// \param addr The address to try to bind to the server (eg, localhost:1234, + /// 192.168.1.1:31416, [::1]:27182, etc.). + /// \param creds The credentials associated with the server. + /// + /// \return bound port number on success, 0 on failure. + /// + /// \warning It is an error to call this method on an already started server. + int AddListeningPort(const grpc::string& addr, + ServerCredentials* creds) override; /// Server constructors. To be used by \a ServerBuilder only. /// @@ -143,30 +146,6 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { sync_server_cqs, int min_pollers, int max_pollers, int sync_cq_timeout_msec); - /// Register a service. This call does not take ownership of the service. - /// The service must exist for the lifetime of the Server instance. - bool RegisterService(const grpc::string* host, Service* service) override; - - /// Register a generic service. This call does not take ownership of the - /// service. The service must exist for the lifetime of the Server instance. - void RegisterAsyncGenericService(AsyncGenericService* service) override; - - /// Try binding the server to the given \a addr endpoint - /// (port, and optionally including IP address to bind to). - /// - /// It can be invoked multiple times. Should be used before - /// starting the server. - /// - /// \param addr The address to try to bind to the server (eg, localhost:1234, - /// 192.168.1.1:31416, [::1]:27182, etc.). - /// \param creds The credentials associated with the server. - /// - /// \return bound port number on success, 0 on failure. - /// - /// \warning It is an error to call this method on an already started server. - int AddListeningPort(const grpc::string& addr, - ServerCredentials* creds) override; - /// Start the server. /// /// \param cqs Completion queues for handling asynchronous services. The @@ -175,6 +154,31 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { /// \param num_cqs How many completion queues does \a cqs hold. void Start(ServerCompletionQueue** cqs, size_t num_cqs) override; + grpc_server* server() override { return server_; }; + + private: + friend class AsyncGenericService; + friend class ServerBuilder; + friend class ServerInitializer; + + class SyncRequest; + class AsyncRequest; + class ShutdownRequest; + + /// SyncRequestThreadManager is an implementation of ThreadManager. This class + /// is responsible for polling for incoming RPCs and calling the RPC handlers. + /// This is only used in case of a Sync server (i.e a server exposing a sync + /// interface) + class SyncRequestThreadManager; + + class UnimplementedAsyncRequestContext; + class UnimplementedAsyncRequest; + class UnimplementedAsyncResponse; + + /// Register a generic service. This call does not take ownership of the + /// service. The service must exist for the lifetime of the Server instance. + void RegisterAsyncGenericService(AsyncGenericService* service) override; + void PerformOpsOnCall(internal::CallOpSetInterface* ops, internal::Call* call) override; @@ -184,8 +188,6 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { return max_receive_message_size_; }; - grpc_server* server() override { return server_; }; - ServerInitializer* initializer(); const int max_receive_message_size_; @@ -200,7 +202,7 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { /// the \a sync_server_cqs) std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_; - // Sever status + // Server status std::mutex mu_; bool started_; bool shutdown_; diff --git a/include/grpcpp/server_builder.h b/include/grpcpp/server_builder.h index c35a6cf98a..4c8dcf4916 100644 --- a/include/grpcpp/server_builder.h +++ b/include/grpcpp/server_builder.h @@ -52,7 +52,7 @@ class ServerBuilderPluginTest; class ServerBuilder { public: ServerBuilder(); - ~ServerBuilder(); + virtual ~ServerBuilder(); ////////////////////////////////////////////////////////////////////////////// // Primary API's @@ -65,7 +65,7 @@ class ServerBuilder { /// traffic (via AddListeningPort) /// 3. [for async api only] completion queues have been added via /// AddCompletionQueue - std::unique_ptr<Server> BuildAndStart(); + virtual std::unique_ptr<Server> BuildAndStart(); /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the \a Server instance returned @@ -210,15 +210,48 @@ class ServerBuilder { /// doc/workarounds.md. ServerBuilder& EnableWorkaround(grpc_workaround_list id); - private: - friend class ::grpc::testing::ServerBuilderPluginTest; - + protected: + /// Experimental, to be deprecated struct Port { grpc::string addr; std::shared_ptr<ServerCredentials> creds; int* selected_port; }; + /// Experimental, to be deprecated + typedef std::unique_ptr<grpc::string> HostString; + struct NamedService { + explicit NamedService(Service* s) : service(s) {} + NamedService(const grpc::string& h, Service* s) + : host(new grpc::string(h)), service(s) {} + HostString host; + Service* service; + }; + + /// Experimental, to be deprecated + std::vector<Port> ports() { return ports_; } + + /// Experimental, to be deprecated + std::vector<NamedService*> services() { + std::vector<NamedService*> service_refs; + for (auto& ptr : services_) { + service_refs.push_back(ptr.get()); + } + return service_refs; + } + + /// Experimental, to be deprecated + std::vector<ServerBuilderOption*> options() { + std::vector<ServerBuilderOption*> option_refs; + for (auto& ptr : options_) { + option_refs.push_back(ptr.get()); + } + return option_refs; + } + + private: + friend class ::grpc::testing::ServerBuilderPluginTest; + struct SyncServerSettings { SyncServerSettings() : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {} @@ -238,15 +271,6 @@ class ServerBuilder { int cq_timeout_msec; }; - typedef std::unique_ptr<grpc::string> HostString; - struct NamedService { - explicit NamedService(Service* s) : service(s) {} - NamedService(const grpc::string& h, Service* s) - : host(new grpc::string(h)), service(s) {} - HostString host; - Service* service; - }; - int max_receive_message_size_; int max_send_message_size_; std::vector<std::unique_ptr<ServerBuilderOption>> options_; diff --git a/src/core/lib/gpr/log_linux.cc b/src/core/lib/gpr/log_linux.cc index d743eedf38..e4417d9d5d 100644 --- a/src/core/lib/gpr/log_linux.cc +++ b/src/core/lib/gpr/log_linux.cc @@ -32,6 +32,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/time.h> +#include <inttypes.h> #include <stdarg.h> #include <stdio.h> #include <string.h> diff --git a/src/core/lib/security/transport/client_auth_filter.cc b/src/core/lib/security/transport/client_auth_filter.cc index d6ca8ee8f8..048e390a71 100644 --- a/src/core/lib/security/transport/client_auth_filter.cc +++ b/src/core/lib/security/transport/client_auth_filter.cc @@ -45,8 +45,6 @@ struct call_data { grpc_call_stack* owning_call; grpc_call_combiner* call_combiner; grpc_call_credentials* creds; - bool have_host; - bool have_method; grpc_slice host; grpc_slice method; /* pollset{_set} bound to this call; if we need to make external @@ -294,27 +292,15 @@ static void auth_start_transport_stream_op_batch( } if (batch->send_initial_metadata) { - for (grpc_linked_mdelem* l = batch->payload->send_initial_metadata - .send_initial_metadata->list.head; - l != nullptr; l = l->next) { - grpc_mdelem md = l->md; - /* Pointer comparison is OK for md_elems created from the same context. - */ - if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_AUTHORITY)) { - if (calld->have_host) { - grpc_slice_unref_internal(calld->host); - } - calld->host = grpc_slice_ref_internal(GRPC_MDVALUE(md)); - calld->have_host = true; - } else if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_PATH)) { - if (calld->have_method) { - grpc_slice_unref_internal(calld->method); - } - calld->method = grpc_slice_ref_internal(GRPC_MDVALUE(md)); - calld->have_method = true; - } + grpc_metadata_batch* metadata = + batch->payload->send_initial_metadata.send_initial_metadata; + if (metadata->idx.named.path != nullptr) { + calld->method = + grpc_slice_ref_internal(GRPC_MDVALUE(metadata->idx.named.path->md)); } - if (calld->have_host) { + if (metadata->idx.named.authority != nullptr) { + calld->host = grpc_slice_ref_internal( + GRPC_MDVALUE(metadata->idx.named.authority->md)); batch->handler_private.extra_arg = elem; GRPC_CALL_STACK_REF(calld->owning_call, "check_call_host"); GRPC_CLOSURE_INIT(&calld->async_result_closure, on_host_checked, batch, @@ -351,6 +337,8 @@ static grpc_error* init_call_elem(grpc_call_element* elem, call_data* calld = static_cast<call_data*>(elem->call_data); calld->owning_call = args->call_stack; calld->call_combiner = args->call_combiner; + calld->host = grpc_empty_slice(); + calld->method = grpc_empty_slice(); return GRPC_ERROR_NONE; } @@ -367,12 +355,8 @@ static void destroy_call_elem(grpc_call_element* elem, call_data* calld = static_cast<call_data*>(elem->call_data); grpc_credentials_mdelem_array_destroy(&calld->md_array); grpc_call_credentials_unref(calld->creds); - if (calld->have_host) { - grpc_slice_unref_internal(calld->host); - } - if (calld->have_method) { - grpc_slice_unref_internal(calld->method); - } + grpc_slice_unref_internal(calld->host); + grpc_slice_unref_internal(calld->method); grpc_auth_metadata_context_reset(&calld->auth_md_context); } diff --git a/src/core/lib/transport/byte_stream.cc b/src/core/lib/transport/byte_stream.cc index 1aaf40fb99..cb15a71a91 100644 --- a/src/core/lib/transport/byte_stream.cc +++ b/src/core/lib/transport/byte_stream.cc @@ -79,17 +79,19 @@ void SliceBufferByteStream::Shutdown(grpc_error* error) { // ByteStreamCache::ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream) - : underlying_stream_(std::move(underlying_stream)) { + : underlying_stream_(std::move(underlying_stream)), + length_(underlying_stream_->length()), + flags_(underlying_stream_->flags()) { grpc_slice_buffer_init(&cache_buffer_); } -ByteStreamCache::~ByteStreamCache() { - if (underlying_stream_ != nullptr) Destroy(); -} +ByteStreamCache::~ByteStreamCache() { Destroy(); } void ByteStreamCache::Destroy() { underlying_stream_.reset(); - grpc_slice_buffer_destroy_internal(&cache_buffer_); + if (cache_buffer_.length > 0) { + grpc_slice_buffer_destroy_internal(&cache_buffer_); + } } // @@ -97,9 +99,7 @@ void ByteStreamCache::Destroy() { // ByteStreamCache::CachingByteStream::CachingByteStream(ByteStreamCache* cache) - : ByteStream(cache->underlying_stream_->length(), - cache->underlying_stream_->flags()), - cache_(cache) {} + : ByteStream(cache->length_, cache->flags_), cache_(cache) {} ByteStreamCache::CachingByteStream::~CachingByteStream() {} @@ -115,6 +115,7 @@ bool ByteStreamCache::CachingByteStream::Next(size_t max_size_hint, grpc_closure* on_complete) { if (shutdown_error_ != GRPC_ERROR_NONE) return true; if (cursor_ < cache_->cache_buffer_.count) return true; + GPR_ASSERT(cache_->underlying_stream_ != nullptr); return cache_->underlying_stream_->Next(max_size_hint, on_complete); } @@ -125,13 +126,20 @@ grpc_error* ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) { if (cursor_ < cache_->cache_buffer_.count) { *slice = grpc_slice_ref_internal(cache_->cache_buffer_.slices[cursor_]); ++cursor_; + offset_ += GRPC_SLICE_LENGTH(*slice); return GRPC_ERROR_NONE; } + GPR_ASSERT(cache_->underlying_stream_ != nullptr); grpc_error* error = cache_->underlying_stream_->Pull(slice); if (error == GRPC_ERROR_NONE) { - ++cursor_; grpc_slice_buffer_add(&cache_->cache_buffer_, grpc_slice_ref_internal(*slice)); + ++cursor_; + offset_ += GRPC_SLICE_LENGTH(*slice); + // Orphan the underlying stream if it's been drained. + if (offset_ == cache_->underlying_stream_->length()) { + cache_->underlying_stream_.reset(); + } } return error; } @@ -139,9 +147,14 @@ grpc_error* ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) { void ByteStreamCache::CachingByteStream::Shutdown(grpc_error* error) { GRPC_ERROR_UNREF(shutdown_error_); shutdown_error_ = GRPC_ERROR_REF(error); - cache_->underlying_stream_->Shutdown(error); + if (cache_->underlying_stream_ != nullptr) { + cache_->underlying_stream_->Shutdown(error); + } } -void ByteStreamCache::CachingByteStream::Reset() { cursor_ = 0; } +void ByteStreamCache::CachingByteStream::Reset() { + cursor_ = 0; + offset_ = 0; +} } // namespace grpc_core diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index f8243ac40d..95a756a5e4 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -139,6 +139,7 @@ class ByteStreamCache { private: ByteStreamCache* cache_; size_t cursor_ = 0; + size_t offset_ = 0; grpc_error* shutdown_error_ = GRPC_ERROR_NONE; }; @@ -153,6 +154,8 @@ class ByteStreamCache { private: OrphanablePtr<ByteStream> underlying_stream_; + uint32_t length_; + uint32_t flags_; grpc_slice_buffer cache_buffer_; }; diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 5638636e67..760aaa4b4d 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -382,11 +382,12 @@ Server::Server( global_callbacks_ = g_callbacks; global_callbacks_->UpdateArguments(args); - for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); - it++) { - sync_req_mgrs_.emplace_back(new SyncRequestThreadManager( - this, (*it).get(), global_callbacks_, min_pollers, max_pollers, - sync_cq_timeout_msec)); + if (sync_server_cqs_ != nullptr) { + for (const auto& it : *sync_server_cqs_) { + sync_req_mgrs_.emplace_back(new SyncRequestThreadManager( + this, it.get(), global_callbacks_, min_pollers, max_pollers, + sync_cq_timeout_msec)); + } } grpc_channel_args channel_args; @@ -525,7 +526,7 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { // explicit one. if (health_check_service_ == nullptr && !health_check_service_disabled_ && DefaultHealthCheckServiceEnabled()) { - if (sync_server_cqs_->empty()) { + if (sync_server_cqs_ == nullptr || sync_server_cqs_->empty()) { gpr_log(GPR_INFO, "Default health check service disabled at async-only server."); } else { diff --git a/test/core/end2end/cq_verifier.cc b/test/core/end2end/cq_verifier.cc index c3a3f43fe1..f7e64effcd 100644 --- a/test/core/end2end/cq_verifier.cc +++ b/test/core/end2end/cq_verifier.cc @@ -18,6 +18,7 @@ #include "test/core/end2end/cq_verifier.h" +#include <inttypes.h> #include <stdarg.h> #include <stdio.h> #include <string.h> diff --git a/test/core/end2end/tests/cancel_after_invoke.cc b/test/core/end2end/tests/cancel_after_invoke.cc index a3e36e0e3f..72f724281e 100644 --- a/test/core/end2end/tests/cancel_after_invoke.cc +++ b/test/core/end2end/tests/cancel_after_invoke.cc @@ -18,6 +18,7 @@ #include "test/core/end2end/end2end_tests.h" +#include <inttypes.h> #include <stdio.h> #include <string.h> diff --git a/test/core/end2end/tests/cancel_before_invoke.cc b/test/core/end2end/tests/cancel_before_invoke.cc index e9fa0466fb..c7d3ed5a4f 100644 --- a/test/core/end2end/tests/cancel_before_invoke.cc +++ b/test/core/end2end/tests/cancel_before_invoke.cc @@ -18,6 +18,7 @@ #include "test/core/end2end/end2end_tests.h" +#include <inttypes.h> #include <stdio.h> #include <string.h> diff --git a/test/core/end2end/tests/cancel_with_status.cc b/test/core/end2end/tests/cancel_with_status.cc index 6820ba5a2f..887da85939 100644 --- a/test/core/end2end/tests/cancel_with_status.cc +++ b/test/core/end2end/tests/cancel_with_status.cc @@ -18,6 +18,7 @@ #include "test/core/end2end/end2end_tests.h" +#include <inttypes.h> #include <stdio.h> #include <string.h> diff --git a/test/core/end2end/tests/negative_deadline.cc b/test/core/end2end/tests/negative_deadline.cc index b28bee0fb2..dce3a02d25 100644 --- a/test/core/end2end/tests/negative_deadline.cc +++ b/test/core/end2end/tests/negative_deadline.cc @@ -18,6 +18,7 @@ #include "test/core/end2end/end2end_tests.h" +#include <inttypes.h> #include <stdio.h> #include <string.h> diff --git a/test/core/gpr/mpscq_test.cc b/test/core/gpr/mpscq_test.cc index 8c0873941f..f51bdf8c50 100644 --- a/test/core/gpr/mpscq_test.cc +++ b/test/core/gpr/mpscq_test.cc @@ -18,6 +18,7 @@ #include "src/core/lib/gpr/mpscq.h" +#include <inttypes.h> #include <stdlib.h> #include <grpc/support/alloc.h> diff --git a/test/core/gpr/time_test.cc b/test/core/gpr/time_test.cc index c80aac649d..6f070f58df 100644 --- a/test/core/gpr/time_test.cc +++ b/test/core/gpr/time_test.cc @@ -21,6 +21,7 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include <inttypes.h> #include <limits.h> #include <stdio.h> #include <stdlib.h> diff --git a/test/core/util/test_config.cc b/test/core/util/test_config.cc index 53a6297d6e..6a0d444a73 100644 --- a/test/core/util/test_config.cc +++ b/test/core/util/test_config.cc @@ -18,6 +18,7 @@ #include "test/core/util/test_config.h" +#include <inttypes.h> #include <signal.h> #include <stdbool.h> #include <stdio.h> diff --git a/test/cpp/thread_manager/thread_manager_test.cc b/test/cpp/thread_manager/thread_manager_test.cc index fad52cf7fa..7a95a9f17d 100644 --- a/test/cpp/thread_manager/thread_manager_test.cc +++ b/test/cpp/thread_manager/thread_manager_test.cc @@ -16,6 +16,7 @@ *is % allowed in string */ +#include <inttypes.h> #include <ctime> #include <memory> #include <string> |