diff options
author | Yang Gao <yangg@google.com> | 2015-04-30 16:07:06 -0700 |
---|---|---|
committer | Yang Gao <yangg@google.com> | 2015-04-30 16:07:06 -0700 |
commit | 3921c56bee2adff62cb0f9519114d2aa22a67410 (patch) | |
tree | 7fdd42a532ca2040b15441d92bcd0b66eccf0397 /src/cpp/server | |
parent | 6d42a73bb984e19eb1bf84e2a952eec861cea464 (diff) |
Expose max message size at the server side
Diffstat (limited to 'src/cpp/server')
-rw-r--r-- | src/cpp/server/server.cc | 30 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 5 |
2 files changed, 26 insertions, 9 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 4694a3a7ff..d8f8ab4b94 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -100,7 +100,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { public: explicit CallData(Server* server, SyncRequest* mrd) : cq_(mrd->cq_), - call_(mrd->call_, server, &cq_), + call_(mrd->call_, server, &cq_, server->max_message_size_), ctx_(mrd->deadline_, mrd->request_metadata_.metadata, mrd->request_metadata_.count), has_request_payload_(mrd->has_request_payload_), @@ -126,7 +126,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { if (has_request_payload_) { GRPC_TIMER_MARK(DESER_PROTO_BEGIN, call_.call()); req.reset(method_->AllocateRequestProto()); - if (!DeserializeProto(request_payload_, req.get())) { + if (!DeserializeProto(request_payload_, req.get(), call_.max_message_size())) { abort(); // for now } GRPC_TIMER_MARK(DESER_PROTO_END, call_.call()); @@ -176,12 +176,27 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_completion_queue* cq_; }; -Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned) - : started_(false), +grpc_server* CreateServer(grpc_completion_queue* cq, int max_message_size) { + if (max_message_size > 0) { + grpc_arg arg; + arg.type = GRPC_ARG_INTEGER; + arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH); + arg.value.integer = max_message_size; + grpc_channel_args args = {1, &arg}; + return grpc_server_create(cq, &args); + } else { + return grpc_server_create(cq, nullptr); + } +} + +Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, + int max_message_size) + : max_message_size_(max_message_size), + started_(false), shutdown_(false), num_running_cb_(0), sync_methods_(new std::list<SyncRequest>), - server_(grpc_server_create(cq_.cq(), nullptr)), + server_(CreateServer(cq_.cq(), max_message_size)), thread_pool_(thread_pool), thread_pool_owned_(thread_pool_owned) {} @@ -347,7 +362,8 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { if (*status && request_) { if (payload_) { GRPC_TIMER_MARK(DESER_PROTO_BEGIN, call_); - *status = DeserializeProto(payload_, request_); + *status = DeserializeProto(payload_, request_, + server_->max_message_size_); GRPC_TIMER_MARK(DESER_PROTO_END, call_); } else { *status = false; @@ -374,7 +390,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { } ctx->call_ = call_; ctx->cq_ = cq_; - Call call(call_, server_, cq_); + Call call(call_, server_, cq_, server_->max_message_size_); if (orig_status && call_) { ctx->BeginCompletionOp(&call); } diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 81cb0e6724..e48d1eeb42 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -42,7 +42,7 @@ namespace grpc { ServerBuilder::ServerBuilder() - : generic_service_(nullptr), thread_pool_(nullptr) {} + : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {} void ServerBuilder::RegisterService(SynchronousService* service) { services_.push_back(service->service()); @@ -86,7 +86,8 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { thread_pool_ = new ThreadPool(cores); thread_pool_owned = true; } - std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned)); + std::unique_ptr<Server> server( + new Server(thread_pool_, thread_pool_owned, max_message_size_)); for (auto service = services_.begin(); service != services_.end(); service++) { if (!server->RegisterService(*service)) { |