aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/server.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server/server.cc')
-rw-r--r--src/cpp/server/server.cc62
1 files changed, 47 insertions, 15 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index ab87b22f5f..a70b555855 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -67,11 +67,17 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
method->method_type() ==
RpcMethod::SERVER_STREAMING),
+ call_details_(nullptr),
cq_(nullptr) {
grpc_metadata_array_init(&request_metadata_);
}
- ~SyncRequest() { grpc_metadata_array_destroy(&request_metadata_); }
+ ~SyncRequest() {
+ if (call_details_) {
+ delete call_details_;
+ }
+ grpc_metadata_array_destroy(&request_metadata_);
+ }
static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
void* tag = nullptr;
@@ -84,7 +90,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
- void SetupRequest() { cq_ = grpc_completion_queue_create(); }
+ void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
void TeardownRequest() {
grpc_completion_queue_destroy(cq_);
@@ -94,17 +100,32 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_server_request_registered_call(
- server, tag_, &call_, &deadline_, &request_metadata_,
- has_request_payload_ ? &request_payload_ : nullptr, cq_,
- notify_cq, this));
+ if (tag_) {
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_registered_call(
+ server, tag_, &call_, &deadline_, &request_metadata_,
+ has_request_payload_ ? &request_payload_ : nullptr, cq_,
+ notify_cq, this));
+ } else {
+ if (!call_details_) {
+ call_details_ = new grpc_call_details;
+ grpc_call_details_init(call_details_);
+ }
+ GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
+ server, &call_, call_details_,
+ &request_metadata_, cq_, notify_cq, this));
+ }
}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
if (!*status) {
grpc_completion_queue_destroy(cq_);
}
+ if (call_details_) {
+ deadline_ = call_details_->deadline;
+ grpc_call_details_destroy(call_details_);
+ grpc_call_details_init(call_details_);
+ }
return true;
}
@@ -157,6 +178,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
bool in_flight_;
const bool has_request_payload_;
grpc_call* call_;
+ grpc_call_details* call_details_;
gpr_timespec deadline_;
grpc_metadata_array request_metadata_;
grpc_byte_buffer* request_payload_;
@@ -170,9 +192,9 @@ static grpc_server* CreateServer(int max_message_size) {
arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
arg.value.integer = max_message_size;
grpc_channel_args args = {1, &arg};
- return grpc_server_create(&args);
+ return grpc_server_create(&args, nullptr);
} else {
- return grpc_server_create(nullptr);
+ return grpc_server_create(nullptr, nullptr);
}
}
@@ -183,10 +205,11 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
shutdown_(false),
num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>),
+ has_generic_service_(false),
server_(CreateServer(max_message_size)),
thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) {
- grpc_server_register_completion_queue(server_, cq_.cq());
+ grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
}
Server::~Server() {
@@ -217,13 +240,13 @@ bool Server::RegisterService(const grpc::string *host, RpcService* service) {
method->name());
return false;
}
- SyncRequest request(method, tag);
- sync_methods_->emplace_back(request);
+ sync_methods_->emplace_back(method, tag);
}
return true;
}
-bool Server::RegisterAsyncService(const grpc::string *host, AsynchronousService* service) {
+bool Server::RegisterAsyncService(const grpc::string* host,
+ AsynchronousService* service) {
GPR_ASSERT(service->server_ == nullptr &&
"Can only register an asynchronous service against one server.");
service->server_ = this;
@@ -245,6 +268,7 @@ void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
GPR_ASSERT(service->server_ == nullptr &&
"Can only register an async generic service against one server.");
service->server_ = this;
+ has_generic_service_ = true;
}
int Server::AddListeningPort(const grpc::string& addr,
@@ -258,6 +282,14 @@ bool Server::Start() {
started_ = true;
grpc_server_start(server_);
+ if (!has_generic_service_) {
+ unknown_method_.reset(new RpcServiceMethod(
+ "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
+ // Use of emplace_back with just constructor arguments is not accepted here
+ // by gcc-4.4 because it can't match the anonymous nullptr with a proper
+ // constructor implicitly. Construct the object and use push_back.
+ sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
+ }
// Start processing rpcs.
if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
@@ -297,8 +329,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
size_t nops = 0;
grpc_op cops[MAX_OPS];
ops->FillOps(cops, &nops);
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), cops, nops, ops));
+ auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
+ GPR_ASSERT(GRPC_CALL_OK == result);
}
Server::BaseAsyncRequest::BaseAsyncRequest(