aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
authorGravatar Yang Gao <yangg@google.com>2015-04-30 16:07:06 -0700
committerGravatar Yang Gao <yangg@google.com>2015-04-30 16:07:06 -0700
commit3921c56bee2adff62cb0f9519114d2aa22a67410 (patch)
tree7fdd42a532ca2040b15441d92bcd0b66eccf0397 /src/cpp
parent6d42a73bb984e19eb1bf84e2a952eec861cea464 (diff)
Expose max message size at the server side
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/common/call.cc13
-rw-r--r--src/cpp/proto/proto_utils.cc9
-rw-r--r--src/cpp/proto/proto_utils.h3
-rw-r--r--src/cpp/server/server.cc30
-rw-r--r--src/cpp/server/server_builder.cc5
5 files changed, 46 insertions, 14 deletions
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index 9878133331..25609a7759 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -55,6 +55,7 @@ CallOpBuffer::CallOpBuffer()
recv_message_(nullptr),
recv_message_buffer_(nullptr),
recv_buf_(nullptr),
+ max_message_size_(-1),
client_send_close_(false),
recv_trailing_metadata_(nullptr),
recv_status_(nullptr),
@@ -311,7 +312,7 @@ bool CallOpBuffer::FinalizeResult(void** tag, bool* status) {
got_message = *status;
if (recv_message_) {
GRPC_TIMER_MARK(DESER_PROTO_BEGIN, 0);
- *status = *status && DeserializeProto(recv_buf_, recv_message_);
+ *status = *status && DeserializeProto(recv_buf_, recv_message_, max_message_size_);
grpc_byte_buffer_destroy(recv_buf_);
GRPC_TIMER_MARK(DESER_PROTO_END, 0);
} else {
@@ -338,9 +339,17 @@ bool CallOpBuffer::FinalizeResult(void** tag, bool* status) {
}
Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq)
- : call_hook_(call_hook), cq_(cq), call_(call) {}
+ : call_hook_(call_hook), cq_(cq), call_(call), max_message_size_(-1) {}
+
+Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq,
+ int max_message_size)
+ : call_hook_(call_hook), cq_(cq), call_(call),
+ max_message_size_(max_message_size) {}
void Call::PerformOps(CallOpBuffer* buffer) {
+ if (max_message_size_ > 0) {
+ buffer->set_max_message_size(max_message_size_);
+ }
call_hook_->PerformOpsOnCall(buffer, this);
}
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc
index b8de2ea173..8ab536aab8 100644
--- a/src/cpp/proto/proto_utils.cc
+++ b/src/cpp/proto/proto_utils.cc
@@ -158,9 +158,14 @@ bool SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) {
return msg.SerializeToZeroCopyStream(&writer);
}
-bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg) {
+bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
+ int max_message_size) {
GrpcBufferReader reader(buffer);
- return msg->ParseFromZeroCopyStream(&reader);
+ ::grpc::protobuf::io::CodedInputStream decoder(&reader);
+ if (max_message_size > 0) {
+ decoder.SetTotalBytesLimit(max_message_size, max_message_size);
+ }
+ return msg->ParseFromCodedStream(&decoder) && decoder.ConsumedEntireMessage();
}
} // namespace grpc
diff --git a/src/cpp/proto/proto_utils.h b/src/cpp/proto/proto_utils.h
index bc60dc9929..67a775b3ca 100644
--- a/src/cpp/proto/proto_utils.h
+++ b/src/cpp/proto/proto_utils.h
@@ -47,7 +47,8 @@ bool SerializeProto(const grpc::protobuf::Message& msg,
grpc_byte_buffer** buffer);
// The caller keeps ownership of buffer and msg.
-bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg);
+bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
+ int max_message_size);
} // namespace grpc
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 4694a3a7ff..d8f8ab4b94 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -100,7 +100,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
public:
explicit CallData(Server* server, SyncRequest* mrd)
: cq_(mrd->cq_),
- call_(mrd->call_, server, &cq_),
+ call_(mrd->call_, server, &cq_, server->max_message_size_),
ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
mrd->request_metadata_.count),
has_request_payload_(mrd->has_request_payload_),
@@ -126,7 +126,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
if (has_request_payload_) {
GRPC_TIMER_MARK(DESER_PROTO_BEGIN, call_.call());
req.reset(method_->AllocateRequestProto());
- if (!DeserializeProto(request_payload_, req.get())) {
+ if (!DeserializeProto(request_payload_, req.get(), call_.max_message_size())) {
abort(); // for now
}
GRPC_TIMER_MARK(DESER_PROTO_END, call_.call());
@@ -176,12 +176,27 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_completion_queue* cq_;
};
-Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned)
- : started_(false),
+grpc_server* CreateServer(grpc_completion_queue* cq, int max_message_size) {
+ if (max_message_size > 0) {
+ grpc_arg arg;
+ arg.type = GRPC_ARG_INTEGER;
+ arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
+ arg.value.integer = max_message_size;
+ grpc_channel_args args = {1, &arg};
+ return grpc_server_create(cq, &args);
+ } else {
+ return grpc_server_create(cq, nullptr);
+ }
+}
+
+Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
+ int max_message_size)
+ : max_message_size_(max_message_size),
+ started_(false),
shutdown_(false),
num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>),
- server_(grpc_server_create(cq_.cq(), nullptr)),
+ server_(CreateServer(cq_.cq(), max_message_size)),
thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) {}
@@ -347,7 +362,8 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
if (*status && request_) {
if (payload_) {
GRPC_TIMER_MARK(DESER_PROTO_BEGIN, call_);
- *status = DeserializeProto(payload_, request_);
+ *status = DeserializeProto(payload_, request_,
+ server_->max_message_size_);
GRPC_TIMER_MARK(DESER_PROTO_END, call_);
} else {
*status = false;
@@ -374,7 +390,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
}
ctx->call_ = call_;
ctx->cq_ = cq_;
- Call call(call_, server_, cq_);
+ Call call(call_, server_, cq_, server_->max_message_size_);
if (orig_status && call_) {
ctx->BeginCompletionOp(&call);
}
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 81cb0e6724..e48d1eeb42 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -42,7 +42,7 @@
namespace grpc {
ServerBuilder::ServerBuilder()
- : generic_service_(nullptr), thread_pool_(nullptr) {}
+ : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {}
void ServerBuilder::RegisterService(SynchronousService* service) {
services_.push_back(service->service());
@@ -86,7 +86,8 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
thread_pool_ = new ThreadPool(cores);
thread_pool_owned = true;
}
- std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned));
+ std::unique_ptr<Server> server(
+ new Server(thread_pool_, thread_pool_owned, max_message_size_));
for (auto service = services_.begin(); service != services_.end();
service++) {
if (!server->RegisterService(*service)) {