aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server
diff options
context:
space:
mode:
authorGravatar Julien Boeuf <jboeuf@google.com>2015-08-13 22:25:15 -0700
committerGravatar Julien Boeuf <jboeuf@google.com>2015-08-13 22:25:15 -0700
commit455dc6ff0df2825ca6cfa35b29a4439943567c73 (patch)
tree798fcb62f115b541d1085e45089931ccfe702304 /src/cpp/server
parent5941335be3700ea9bfe137b2eeccf9cba0540300 (diff)
parent0791cf0dcdad6c2a972187dcc9d69dc99adecfbe (diff)
Merge branch 'master' of https://github.com/grpc/grpc into cpp_auth_md_processor
Diffstat (limited to 'src/cpp/server')
-rw-r--r--src/cpp/server/server.cc56
-rw-r--r--src/cpp/server/server_builder.cc10
-rw-r--r--src/cpp/server/server_context.cc1
3 files changed, 53 insertions, 14 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index ab87b22f5f..90f3854a72 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -67,11 +67,17 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
method->method_type() ==
RpcMethod::SERVER_STREAMING),
+ call_details_(nullptr),
cq_(nullptr) {
grpc_metadata_array_init(&request_metadata_);
}
- ~SyncRequest() { grpc_metadata_array_destroy(&request_metadata_); }
+ ~SyncRequest() {
+ if (call_details_) {
+ delete call_details_;
+ }
+ grpc_metadata_array_destroy(&request_metadata_);
+ }
static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
void* tag = nullptr;
@@ -84,7 +90,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
- void SetupRequest() { cq_ = grpc_completion_queue_create(); }
+ void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
void TeardownRequest() {
grpc_completion_queue_destroy(cq_);
@@ -94,17 +100,32 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_server_request_registered_call(
- server, tag_, &call_, &deadline_, &request_metadata_,
- has_request_payload_ ? &request_payload_ : nullptr, cq_,
- notify_cq, this));
+ if (tag_) {
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_registered_call(
+ server, tag_, &call_, &deadline_, &request_metadata_,
+ has_request_payload_ ? &request_payload_ : nullptr, cq_,
+ notify_cq, this));
+ } else {
+ if (!call_details_) {
+ call_details_ = new grpc_call_details;
+ grpc_call_details_init(call_details_);
+ }
+ GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
+ server, &call_, call_details_,
+ &request_metadata_, cq_, notify_cq, this));
+ }
}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
if (!*status) {
grpc_completion_queue_destroy(cq_);
}
+ if (call_details_) {
+ deadline_ = call_details_->deadline;
+ grpc_call_details_destroy(call_details_);
+ grpc_call_details_init(call_details_);
+ }
return true;
}
@@ -157,6 +178,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
bool in_flight_;
const bool has_request_payload_;
grpc_call* call_;
+ grpc_call_details* call_details_;
gpr_timespec deadline_;
grpc_metadata_array request_metadata_;
grpc_byte_buffer* request_payload_;
@@ -170,9 +192,9 @@ static grpc_server* CreateServer(int max_message_size) {
arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
arg.value.integer = max_message_size;
grpc_channel_args args = {1, &arg};
- return grpc_server_create(&args);
+ return grpc_server_create(&args, nullptr);
} else {
- return grpc_server_create(nullptr);
+ return grpc_server_create(nullptr, nullptr);
}
}
@@ -183,10 +205,11 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
shutdown_(false),
num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>),
+ has_generic_service_(false),
server_(CreateServer(max_message_size)),
thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) {
- grpc_server_register_completion_queue(server_, cq_.cq());
+ grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
}
Server::~Server() {
@@ -223,7 +246,8 @@ bool Server::RegisterService(const grpc::string *host, RpcService* service) {
return true;
}
-bool Server::RegisterAsyncService(const grpc::string *host, AsynchronousService* service) {
+bool Server::RegisterAsyncService(const grpc::string* host,
+ AsynchronousService* service) {
GPR_ASSERT(service->server_ == nullptr &&
"Can only register an asynchronous service against one server.");
service->server_ = this;
@@ -245,6 +269,7 @@ void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
GPR_ASSERT(service->server_ == nullptr &&
"Can only register an async generic service against one server.");
service->server_ = this;
+ has_generic_service_ = true;
}
int Server::AddListeningPort(const grpc::string& addr,
@@ -258,6 +283,11 @@ bool Server::Start() {
started_ = true;
grpc_server_start(server_);
+ if (!has_generic_service_) {
+ unknown_method_.reset(new RpcServiceMethod(
+ "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
+ sync_methods_->emplace_back(unknown_method_.get(), nullptr);
+ }
// Start processing rpcs.
if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
@@ -297,8 +327,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
size_t nops = 0;
grpc_op cops[MAX_OPS];
ops->FillOps(cops, &nops);
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), cops, nops, ops));
+ auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
+ GPR_ASSERT(GRPC_CALL_OK == result);
}
Server::BaseAsyncRequest::BaseAsyncRequest(
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index f723d4611a..09118879f4 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -38,6 +38,7 @@
#include <grpc++/impl/service_type.h>
#include <grpc++/server.h>
#include <grpc++/thread_pool_interface.h>
+#include <grpc++/fixed_size_thread_pool.h>
namespace grpc {
@@ -100,10 +101,17 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
thread_pool_ = CreateDefaultThreadPool();
thread_pool_owned = true;
}
+ // Async services only, create a thread pool to handle requests to unknown
+ // services.
+ if (!thread_pool_ && !generic_service_ && !async_services_.empty()) {
+ thread_pool_ = new FixedSizeThreadPool(1);
+ thread_pool_owned = true;
+ }
std::unique_ptr<Server> server(
new Server(thread_pool_, thread_pool_owned, max_message_size_));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
- grpc_server_register_completion_queue(server->server_, (*cq)->cq());
+ grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
+ nullptr);
}
for (auto service = services_.begin(); service != services_.end();
service++) {
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 04373397f9..bb34040a2f 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -91,6 +91,7 @@ void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops->data.recv_close_on_server.cancelled = &cancelled_;
ops->flags = 0;
+ ops->reserved = NULL;
*nops = 1;
}