diff options
author | David Garcia Quintas <dgq@google.com> | 2015-08-10 13:39:52 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2015-08-10 15:08:41 -0700 |
commit | beac88ca56f4710e86668f2cbbd80e02e0607f9c (patch) | |
tree | 84ee80880d363b655b97eb964ef3adfe0912c5be /src/cpp/server | |
parent | 47245f509cd9d8ec807b6bdeb6958243d9676fa5 (diff) |
Server: added the ability to disable compression algorithm
Diffstat (limited to 'src/cpp/server')
-rw-r--r-- | src/cpp/server/server.cc | 25 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 53 |
2 files changed, 55 insertions, 23 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index ab87b22f5f..6e576ab8b3 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -163,27 +163,34 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_completion_queue* cq_; }; -static grpc_server* CreateServer(int max_message_size) { +static grpc_server* CreateServer( + int max_message_size, const grpc_compression_options& compression_options) { if (max_message_size > 0) { - grpc_arg arg; - arg.type = GRPC_ARG_INTEGER; - arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH); - arg.value.integer = max_message_size; - grpc_channel_args args = {1, &arg}; - return grpc_server_create(&args); + grpc_arg args[2]; + args[0].type = GRPC_ARG_INTEGER; + args[0].key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH); + args[0].value.integer = max_message_size; + + args[1].type = GRPC_ARG_INTEGER; + args[1].key = const_cast<char*>(GRPC_COMPRESSION_ALGORITHM_STATE_ARG); + args[1].value.integer = compression_options.enabled_algorithms_bitset; + + grpc_channel_args channel_args = {2, args}; + return grpc_server_create(&channel_args); } else { return grpc_server_create(nullptr); } } Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, - int max_message_size) + int max_message_size, + grpc_compression_options compression_options) : max_message_size_(max_message_size), started_(false), shutdown_(false), num_running_cb_(0), sync_methods_(new std::list<SyncRequest>), - server_(CreateServer(max_message_size)), + server_(CreateServer(max_message_size, compression_options)), thread_pool_(thread_pool), thread_pool_owned_(thread_pool_owned) { grpc_server_register_completion_queue(server_, cq_.cq()); diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index f723d4611a..425b052128 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -42,7 +42,9 @@ namespace grpc { ServerBuilder::ServerBuilder() - : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {} + : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) { + grpc_compression_options_init(&compression_options_); +} std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() { ServerCompletionQueue* cq = new ServerCompletionQueue(); @@ -50,44 +52,65 @@ std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() { return std::unique_ptr<ServerCompletionQueue>(cq); } -void ServerBuilder::RegisterService(SynchronousService* service) { +ServerBuilder& ServerBuilder::RegisterService(SynchronousService* service) { services_.emplace_back(new NamedService<RpcService>(service->service())); + return *this; } -void ServerBuilder::RegisterAsyncService(AsynchronousService* service) { +ServerBuilder& ServerBuilder::RegisterAsyncService( + AsynchronousService* service) { async_services_.emplace_back(new NamedService<AsynchronousService>(service)); + return *this; } -void ServerBuilder::RegisterService( +ServerBuilder& ServerBuilder::RegisterService( const grpc::string& addr, SynchronousService* service) { services_.emplace_back(new NamedService<RpcService>(addr, service->service())); + return *this; } -void ServerBuilder::RegisterAsyncService( +ServerBuilder& ServerBuilder::RegisterAsyncService( const grpc::string& addr, AsynchronousService* service) { - async_services_.emplace_back(new NamedService<AsynchronousService>(addr, service)); + async_services_.emplace_back( + new NamedService<AsynchronousService>(addr, service)); + return *this; } -void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) { +ServerBuilder& ServerBuilder::RegisterAsyncGenericService( + AsyncGenericService* service) { if (generic_service_) { gpr_log(GPR_ERROR, "Adding multiple AsyncGenericService is unsupported for now. " "Dropping the service %p", service); - return; + } else { + generic_service_ = service; } - generic_service_ = service; + return *this; +} + +ServerBuilder& ServerBuilder::SetMaxMessageSize(int max_message_size) { + max_message_size_ = max_message_size; + return *this; } -void ServerBuilder::AddListeningPort(const grpc::string& addr, +ServerBuilder& ServerBuilder::AddListeningPort(const grpc::string& addr, std::shared_ptr<ServerCredentials> creds, int* selected_port) { Port port = {addr, creds, selected_port}; ports_.push_back(port); + return *this; } -void ServerBuilder::SetThreadPool(ThreadPoolInterface* thread_pool) { +ServerBuilder& ServerBuilder::SetThreadPool(ThreadPoolInterface* thread_pool) { thread_pool_ = thread_pool; + return *this; +} + +ServerBuilder& ServerBuilder::SetCompressionOptions( + const grpc_compression_options& options) { + compression_options_ = options; + return *this; } std::unique_ptr<Server> ServerBuilder::BuildAndStart() { @@ -100,8 +123,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { thread_pool_ = CreateDefaultThreadPool(); thread_pool_owned = true; } - std::unique_ptr<Server> server( - new Server(thread_pool_, thread_pool_owned, max_message_size_)); + std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned, + max_message_size_, + compression_options_)); for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { grpc_server_register_completion_queue(server->server_, (*cq)->cq()); } @@ -113,7 +137,8 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { } for (auto service = async_services_.begin(); service != async_services_.end(); service++) { - if (!server->RegisterAsyncService((*service)->host.get(), (*service)->service)) { + if (!server->RegisterAsyncService((*service)->host.get(), + (*service)->service)) { return nullptr; } } |