diff options
author | Yang Gao <yangg@google.com> | 2015-04-30 16:07:06 -0700 |
---|---|---|
committer | Yang Gao <yangg@google.com> | 2015-04-30 16:07:06 -0700 |
commit | 3921c56bee2adff62cb0f9519114d2aa22a67410 (patch) | |
tree | 7fdd42a532ca2040b15441d92bcd0b66eccf0397 /src | |
parent | 6d42a73bb984e19eb1bf84e2a952eec861cea464 (diff) |
Expose max message size at the server side
Diffstat (limited to 'src')
-rw-r--r-- | src/cpp/common/call.cc | 13 | ||||
-rw-r--r-- | src/cpp/proto/proto_utils.cc | 9 | ||||
-rw-r--r-- | src/cpp/proto/proto_utils.h | 3 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 30 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 5 |
5 files changed, 46 insertions, 14 deletions
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index 9878133331..25609a7759 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -55,6 +55,7 @@ CallOpBuffer::CallOpBuffer() recv_message_(nullptr), recv_message_buffer_(nullptr), recv_buf_(nullptr), + max_message_size_(-1), client_send_close_(false), recv_trailing_metadata_(nullptr), recv_status_(nullptr), @@ -311,7 +312,7 @@ bool CallOpBuffer::FinalizeResult(void** tag, bool* status) { got_message = *status; if (recv_message_) { GRPC_TIMER_MARK(DESER_PROTO_BEGIN, 0); - *status = *status && DeserializeProto(recv_buf_, recv_message_); + *status = *status && DeserializeProto(recv_buf_, recv_message_, max_message_size_); grpc_byte_buffer_destroy(recv_buf_); GRPC_TIMER_MARK(DESER_PROTO_END, 0); } else { @@ -338,9 +339,17 @@ bool CallOpBuffer::FinalizeResult(void** tag, bool* status) { } Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq) - : call_hook_(call_hook), cq_(cq), call_(call) {} + : call_hook_(call_hook), cq_(cq), call_(call), max_message_size_(-1) {} + +Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq, + int max_message_size) + : call_hook_(call_hook), cq_(cq), call_(call), + max_message_size_(max_message_size) {} void Call::PerformOps(CallOpBuffer* buffer) { + if (max_message_size_ > 0) { + buffer->set_max_message_size(max_message_size_); + } call_hook_->PerformOpsOnCall(buffer, this); } diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc index b8de2ea173..8ab536aab8 100644 --- a/src/cpp/proto/proto_utils.cc +++ b/src/cpp/proto/proto_utils.cc @@ -158,9 +158,14 @@ bool SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) { return msg.SerializeToZeroCopyStream(&writer); } -bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg) { +bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, + int max_message_size) { GrpcBufferReader reader(buffer); - return msg->ParseFromZeroCopyStream(&reader); + ::grpc::protobuf::io::CodedInputStream decoder(&reader); + if (max_message_size > 0) { + decoder.SetTotalBytesLimit(max_message_size, max_message_size); + } + return msg->ParseFromCodedStream(&decoder) && decoder.ConsumedEntireMessage(); } } // namespace grpc diff --git a/src/cpp/proto/proto_utils.h b/src/cpp/proto/proto_utils.h index bc60dc9929..67a775b3ca 100644 --- a/src/cpp/proto/proto_utils.h +++ b/src/cpp/proto/proto_utils.h @@ -47,7 +47,8 @@ bool SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** buffer); // The caller keeps ownership of buffer and msg. -bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg); +bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, + int max_message_size); } // namespace grpc diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 4694a3a7ff..d8f8ab4b94 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -100,7 +100,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { public: explicit CallData(Server* server, SyncRequest* mrd) : cq_(mrd->cq_), - call_(mrd->call_, server, &cq_), + call_(mrd->call_, server, &cq_, server->max_message_size_), ctx_(mrd->deadline_, mrd->request_metadata_.metadata, mrd->request_metadata_.count), has_request_payload_(mrd->has_request_payload_), @@ -126,7 +126,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { if (has_request_payload_) { GRPC_TIMER_MARK(DESER_PROTO_BEGIN, call_.call()); req.reset(method_->AllocateRequestProto()); - if (!DeserializeProto(request_payload_, req.get())) { + if (!DeserializeProto(request_payload_, req.get(), call_.max_message_size())) { abort(); // for now } GRPC_TIMER_MARK(DESER_PROTO_END, call_.call()); @@ -176,12 +176,27 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_completion_queue* cq_; }; -Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned) - : started_(false), +grpc_server* CreateServer(grpc_completion_queue* cq, int max_message_size) { + if (max_message_size > 0) { + grpc_arg arg; + arg.type = GRPC_ARG_INTEGER; + arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH); + arg.value.integer = max_message_size; + grpc_channel_args args = {1, &arg}; + return grpc_server_create(cq, &args); + } else { + return grpc_server_create(cq, nullptr); + } +} + +Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, + int max_message_size) + : max_message_size_(max_message_size), + started_(false), shutdown_(false), num_running_cb_(0), sync_methods_(new std::list<SyncRequest>), - server_(grpc_server_create(cq_.cq(), nullptr)), + server_(CreateServer(cq_.cq(), max_message_size)), thread_pool_(thread_pool), thread_pool_owned_(thread_pool_owned) {} @@ -347,7 +362,8 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { if (*status && request_) { if (payload_) { GRPC_TIMER_MARK(DESER_PROTO_BEGIN, call_); - *status = DeserializeProto(payload_, request_); + *status = DeserializeProto(payload_, request_, + server_->max_message_size_); GRPC_TIMER_MARK(DESER_PROTO_END, call_); } else { *status = false; @@ -374,7 +390,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { } ctx->call_ = call_; ctx->cq_ = cq_; - Call call(call_, server_, cq_); + Call call(call_, server_, cq_, server_->max_message_size_); if (orig_status && call_) { ctx->BeginCompletionOp(&call); } diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 81cb0e6724..e48d1eeb42 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -42,7 +42,7 @@ namespace grpc { ServerBuilder::ServerBuilder() - : generic_service_(nullptr), thread_pool_(nullptr) {} + : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {} void ServerBuilder::RegisterService(SynchronousService* service) { services_.push_back(service->service()); @@ -86,7 +86,8 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { thread_pool_ = new ThreadPool(cores); thread_pool_owned = true; } - std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned)); + std::unique_ptr<Server> server( + new Server(thread_pool_, thread_pool_owned, max_message_size_)); for (auto service = services_.begin(); service != services_.end(); service++) { if (!server->RegisterService(*service)) { |