aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2015-08-10 13:39:52 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2015-08-10 15:08:41 -0700
commitbeac88ca56f4710e86668f2cbbd80e02e0607f9c (patch)
tree84ee80880d363b655b97eb964ef3adfe0912c5be /src/cpp/server
parent47245f509cd9d8ec807b6bdeb6958243d9676fa5 (diff)
Server: added the ability to disable compression algorithm
Diffstat (limited to 'src/cpp/server')
-rw-r--r--src/cpp/server/server.cc25
-rw-r--r--src/cpp/server/server_builder.cc53
2 files changed, 55 insertions, 23 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index ab87b22f5f..6e576ab8b3 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -163,27 +163,34 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_completion_queue* cq_;
};
-static grpc_server* CreateServer(int max_message_size) {
+static grpc_server* CreateServer(
+ int max_message_size, const grpc_compression_options& compression_options) {
if (max_message_size > 0) {
- grpc_arg arg;
- arg.type = GRPC_ARG_INTEGER;
- arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
- arg.value.integer = max_message_size;
- grpc_channel_args args = {1, &arg};
- return grpc_server_create(&args);
+ grpc_arg args[2];
+ args[0].type = GRPC_ARG_INTEGER;
+ args[0].key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
+ args[0].value.integer = max_message_size;
+
+ args[1].type = GRPC_ARG_INTEGER;
+ args[1].key = const_cast<char*>(GRPC_COMPRESSION_ALGORITHM_STATE_ARG);
+ args[1].value.integer = compression_options.enabled_algorithms_bitset;
+
+ grpc_channel_args channel_args = {2, args};
+ return grpc_server_create(&channel_args);
} else {
return grpc_server_create(nullptr);
}
}
Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
- int max_message_size)
+ int max_message_size,
+ grpc_compression_options compression_options)
: max_message_size_(max_message_size),
started_(false),
shutdown_(false),
num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>),
- server_(CreateServer(max_message_size)),
+ server_(CreateServer(max_message_size, compression_options)),
thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) {
grpc_server_register_completion_queue(server_, cq_.cq());
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index f723d4611a..425b052128 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -42,7 +42,9 @@
namespace grpc {
ServerBuilder::ServerBuilder()
- : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {}
+ : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {
+ grpc_compression_options_init(&compression_options_);
+}
std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
ServerCompletionQueue* cq = new ServerCompletionQueue();
@@ -50,44 +52,65 @@ std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
return std::unique_ptr<ServerCompletionQueue>(cq);
}
-void ServerBuilder::RegisterService(SynchronousService* service) {
+ServerBuilder& ServerBuilder::RegisterService(SynchronousService* service) {
services_.emplace_back(new NamedService<RpcService>(service->service()));
+ return *this;
}
-void ServerBuilder::RegisterAsyncService(AsynchronousService* service) {
+ServerBuilder& ServerBuilder::RegisterAsyncService(
+ AsynchronousService* service) {
async_services_.emplace_back(new NamedService<AsynchronousService>(service));
+ return *this;
}
-void ServerBuilder::RegisterService(
+ServerBuilder& ServerBuilder::RegisterService(
const grpc::string& addr, SynchronousService* service) {
services_.emplace_back(new NamedService<RpcService>(addr, service->service()));
+ return *this;
}
-void ServerBuilder::RegisterAsyncService(
+ServerBuilder& ServerBuilder::RegisterAsyncService(
const grpc::string& addr, AsynchronousService* service) {
- async_services_.emplace_back(new NamedService<AsynchronousService>(addr, service));
+ async_services_.emplace_back(
+ new NamedService<AsynchronousService>(addr, service));
+ return *this;
}
-void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) {
+ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
+ AsyncGenericService* service) {
if (generic_service_) {
gpr_log(GPR_ERROR,
"Adding multiple AsyncGenericService is unsupported for now. "
"Dropping the service %p",
service);
- return;
+ } else {
+ generic_service_ = service;
}
- generic_service_ = service;
+ return *this;
+}
+
+ServerBuilder& ServerBuilder::SetMaxMessageSize(int max_message_size) {
+ max_message_size_ = max_message_size;
+ return *this;
}
-void ServerBuilder::AddListeningPort(const grpc::string& addr,
+ServerBuilder& ServerBuilder::AddListeningPort(const grpc::string& addr,
std::shared_ptr<ServerCredentials> creds,
int* selected_port) {
Port port = {addr, creds, selected_port};
ports_.push_back(port);
+ return *this;
}
-void ServerBuilder::SetThreadPool(ThreadPoolInterface* thread_pool) {
+ServerBuilder& ServerBuilder::SetThreadPool(ThreadPoolInterface* thread_pool) {
thread_pool_ = thread_pool;
+ return *this;
+}
+
+ServerBuilder& ServerBuilder::SetCompressionOptions(
+ const grpc_compression_options& options) {
+ compression_options_ = options;
+ return *this;
}
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
@@ -100,8 +123,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
thread_pool_ = CreateDefaultThreadPool();
thread_pool_owned = true;
}
- std::unique_ptr<Server> server(
- new Server(thread_pool_, thread_pool_owned, max_message_size_));
+ std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned,
+ max_message_size_,
+ compression_options_));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
grpc_server_register_completion_queue(server->server_, (*cq)->cq());
}
@@ -113,7 +137,8 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
}
for (auto service = async_services_.begin();
service != async_services_.end(); service++) {
- if (!server->RegisterAsyncService((*service)->host.get(), (*service)->service)) {
+ if (!server->RegisterAsyncService((*service)->host.get(),
+ (*service)->service)) {
return nullptr;
}
}