aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server
diff options
context:
space:
mode:
authorGravatar Yang Gao <yangg@google.com>2015-04-30 16:07:06 -0700
committerGravatar Yang Gao <yangg@google.com>2015-04-30 16:07:06 -0700
commit3921c56bee2adff62cb0f9519114d2aa22a67410 (patch)
tree7fdd42a532ca2040b15441d92bcd0b66eccf0397 /src/cpp/server
parent6d42a73bb984e19eb1bf84e2a952eec861cea464 (diff)
Expose max message size at the server side
Diffstat (limited to 'src/cpp/server')
-rw-r--r--src/cpp/server/server.cc30
-rw-r--r--src/cpp/server/server_builder.cc5
2 files changed, 26 insertions, 9 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 4694a3a7ff..d8f8ab4b94 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -100,7 +100,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
public:
explicit CallData(Server* server, SyncRequest* mrd)
: cq_(mrd->cq_),
- call_(mrd->call_, server, &cq_),
+ call_(mrd->call_, server, &cq_, server->max_message_size_),
ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
mrd->request_metadata_.count),
has_request_payload_(mrd->has_request_payload_),
@@ -126,7 +126,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
if (has_request_payload_) {
GRPC_TIMER_MARK(DESER_PROTO_BEGIN, call_.call());
req.reset(method_->AllocateRequestProto());
- if (!DeserializeProto(request_payload_, req.get())) {
+ if (!DeserializeProto(request_payload_, req.get(), call_.max_message_size())) {
abort(); // for now
}
GRPC_TIMER_MARK(DESER_PROTO_END, call_.call());
@@ -176,12 +176,27 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_completion_queue* cq_;
};
-Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned)
- : started_(false),
+grpc_server* CreateServer(grpc_completion_queue* cq, int max_message_size) {
+ if (max_message_size > 0) {
+ grpc_arg arg;
+ arg.type = GRPC_ARG_INTEGER;
+ arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
+ arg.value.integer = max_message_size;
+ grpc_channel_args args = {1, &arg};
+ return grpc_server_create(cq, &args);
+ } else {
+ return grpc_server_create(cq, nullptr);
+ }
+}
+
+Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
+ int max_message_size)
+ : max_message_size_(max_message_size),
+ started_(false),
shutdown_(false),
num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>),
- server_(grpc_server_create(cq_.cq(), nullptr)),
+ server_(CreateServer(cq_.cq(), max_message_size)),
thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) {}
@@ -347,7 +362,8 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
if (*status && request_) {
if (payload_) {
GRPC_TIMER_MARK(DESER_PROTO_BEGIN, call_);
- *status = DeserializeProto(payload_, request_);
+ *status = DeserializeProto(payload_, request_,
+ server_->max_message_size_);
GRPC_TIMER_MARK(DESER_PROTO_END, call_);
} else {
*status = false;
@@ -374,7 +390,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
}
ctx->call_ = call_;
ctx->cq_ = cq_;
- Call call(call_, server_, cq_);
+ Call call(call_, server_, cq_, server_->max_message_size_);
if (orig_status && call_) {
ctx->BeginCompletionOp(&call);
}
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 81cb0e6724..e48d1eeb42 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -42,7 +42,7 @@
namespace grpc {
ServerBuilder::ServerBuilder()
- : generic_service_(nullptr), thread_pool_(nullptr) {}
+ : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {}
void ServerBuilder::RegisterService(SynchronousService* service) {
services_.push_back(service->service());
@@ -86,7 +86,8 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
thread_pool_ = new ThreadPool(cores);
thread_pool_owned = true;
}
- std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned));
+ std::unique_ptr<Server> server(
+ new Server(thread_pool_, thread_pool_owned, max_message_size_));
for (auto service = services_.begin(); service != services_.end();
service++) {
if (!server->RegisterService(*service)) {