aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/compiler/cpp_generator.cc189
-rw-r--r--src/cpp/server/server.cc39
-rw-r--r--src/cpp/server/server_builder.cc45
3 files changed, 121 insertions, 152 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index 3c8ca8ab45..bc23095326 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -491,39 +491,109 @@ void PrintHeaderServerMethodAsync(
grpc_cpp_generator::ClassName(method->input_type(), true);
(*vars)["Response"] =
grpc_cpp_generator::ClassName(method->output_type(), true);
+ printer->Print(*vars, "template <class BaseClass>\n");
+ printer->Print(*vars,
+ "class WithAsyncMethod_$Method$ : public BaseClass {\n");
+ printer->Print(
+ " private:\n"
+ " void BaseClassMustBeDerivedFromService(Service *service) {}\n");
+ printer->Print(" public:\n");
+ printer->Indent();
+ printer->Print(*vars,
+ "~WithAsyncMethod_$Method$() {\n"
+ " BaseClassMustBeDerivedFromService(this);\n"
+ "}\n");
if (NoStreaming(method)) {
printer->Print(
*vars,
+ "// disable synchronous version of this method\n"
+ "::grpc::Status $Method$("
+ "::grpc::ServerContext* context, const $Request$* request, "
+ "$Response$* response) GRPC_FINAL GRPC_OVERRIDE {\n"
+ " abort();\n"
+ " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
+ "}\n");
+ printer->Print(
+ *vars,
"void Request$Method$("
"::grpc::ServerContext* context, $Request$* request, "
"::grpc::ServerAsyncResponseWriter< $Response$>* response, "
"::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
+ printer->Print(*vars,
+ " ::grpc::Service::RequestAsyncUnary($Idx$, context, "
+ "request, response, new_call_cq, notification_cq, tag);\n");
+ printer->Print("}\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
+ "// disable synchronous version of this method\n"
+ "::grpc::Status $Method$("
+ "::grpc::ServerContext* context, "
+ "::grpc::ServerReader< $Request$>* reader, "
+ "$Response$* response) GRPC_FINAL GRPC_OVERRIDE {\n"
+ " abort();\n"
+ " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
+ "}\n");
+ printer->Print(
+ *vars,
"void Request$Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerAsyncReader< $Response$, $Request$>* reader, "
"::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
+ printer->Print(*vars,
+ " ::grpc::Service::RequestAsyncClientStreaming($Idx$, "
+ "context, reader, new_call_cq, notification_cq, tag);\n");
+ printer->Print("}\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
+ "// disable synchronous version of this method\n"
+ "::grpc::Status $Method$("
+ "::grpc::ServerContext* context, const $Request$* request, "
+ "::grpc::ServerWriter< $Response$>* writer) GRPC_FINAL GRPC_OVERRIDE "
+ "{\n"
+ " abort();\n"
+ " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
+ "}\n");
+ printer->Print(
+ *vars,
"void Request$Method$("
"::grpc::ServerContext* context, $Request$* request, "
"::grpc::ServerAsyncWriter< $Response$>* writer, "
"::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
+ printer->Print(
+ *vars,
+ " ::grpc::Service::RequestAsyncServerStreaming($Idx$, "
+ "context, request, writer, new_call_cq, notification_cq, tag);\n");
+ printer->Print("}\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
+ "// disable synchronous version of this method\n"
+ "::grpc::Status $Method$("
+ "::grpc::ServerContext* context, "
+ "::grpc::ServerReaderWriter< $Response$, $Request$>* stream) "
+ "GRPC_FINAL GRPC_OVERRIDE {\n"
+ " abort();\n"
+ " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
+ "}\n");
+ printer->Print(
+ *vars,
"void Request$Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, "
"::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
+ printer->Print(*vars,
+ " ::grpc::Service::RequestAsyncBidiStreaming($Idx$, "
+ "context, stream, new_call_cq, notification_cq, tag);\n");
+ printer->Print("}\n");
}
+ printer->Outdent();
+ printer->Print(*vars, "};\n");
}
void PrintHeaderService(grpc::protobuf::io::Printer *printer,
@@ -580,9 +650,9 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
printer->Print("\n");
- // Server side - Synchronous
+ // Server side - base
printer->Print(
- "class Service : public ::grpc::SynchronousService {\n"
+ "class Service : public ::grpc::Service {\n"
" public:\n");
printer->Indent();
printer->Print("Service();\n");
@@ -590,26 +660,26 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderServerMethodSync(printer, service->method(i), vars);
}
- printer->Print("::grpc::RpcService* service() GRPC_OVERRIDE GRPC_FINAL;\n");
printer->Outdent();
- printer->Print(
- " private:\n"
- " std::unique_ptr< ::grpc::RpcService> service_;\n");
printer->Print("};\n");
// Server side - Asynchronous
- printer->Print(
- "class AsyncService GRPC_FINAL : public ::grpc::AsynchronousService {\n"
- " public:\n");
- printer->Indent();
- (*vars)["MethodCount"] = as_string(service->method_count());
- printer->Print("explicit AsyncService();\n");
- printer->Print("~AsyncService() {};\n");
for (int i = 0; i < service->method_count(); ++i) {
+ (*vars)["Idx"] = as_string(i);
PrintHeaderServerMethodAsync(printer, service->method(i), vars);
}
- printer->Outdent();
- printer->Print("};\n");
+
+ printer->Print("typedef ");
+
+ for (int i = 0; i < service->method_count(); ++i) {
+ (*vars)["method_name"] = service->method(i)->name();
+ printer->Print(*vars, "WithAsyncMethod_$method_name$<");
+ }
+ printer->Print("Service");
+ for (int i = 0; i < service->method_count(); ++i) {
+ printer->Print(" >");
+ }
+ printer->Print(" AsyncService;\n");
printer->Outdent();
printer->Print("};\n");
@@ -889,69 +959,6 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer,
}
}
-void PrintSourceServerAsyncMethod(
- grpc::protobuf::io::Printer *printer,
- const grpc::protobuf::MethodDescriptor *method,
- std::map<grpc::string, grpc::string> *vars) {
- (*vars)["Method"] = method->name();
- (*vars)["Request"] =
- grpc_cpp_generator::ClassName(method->input_type(), true);
- (*vars)["Response"] =
- grpc_cpp_generator::ClassName(method->output_type(), true);
- if (NoStreaming(method)) {
- printer->Print(
- *vars,
- "void $ns$$Service$::AsyncService::Request$Method$("
- "::grpc::ServerContext* context, "
- "$Request$* request, "
- "::grpc::ServerAsyncResponseWriter< $Response$>* response, "
- "::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
- printer->Print(*vars,
- " AsynchronousService::RequestAsyncUnary($Idx$, context, "
- "request, response, new_call_cq, notification_cq, tag);\n");
- printer->Print("}\n\n");
- } else if (ClientOnlyStreaming(method)) {
- printer->Print(
- *vars,
- "void $ns$$Service$::AsyncService::Request$Method$("
- "::grpc::ServerContext* context, "
- "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, "
- "::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
- printer->Print(*vars,
- " AsynchronousService::RequestClientStreaming($Idx$, "
- "context, reader, new_call_cq, notification_cq, tag);\n");
- printer->Print("}\n\n");
- } else if (ServerOnlyStreaming(method)) {
- printer->Print(
- *vars,
- "void $ns$$Service$::AsyncService::Request$Method$("
- "::grpc::ServerContext* context, "
- "$Request$* request, "
- "::grpc::ServerAsyncWriter< $Response$>* writer, "
- "::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
- printer->Print(
- *vars,
- " AsynchronousService::RequestServerStreaming($Idx$, "
- "context, request, writer, new_call_cq, notification_cq, tag);\n");
- printer->Print("}\n\n");
- } else if (BidiStreaming(method)) {
- printer->Print(
- *vars,
- "void $ns$$Service$::AsyncService::Request$Method$("
- "::grpc::ServerContext* context, "
- "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, "
- "::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
- printer->Print(*vars,
- " AsynchronousService::RequestBidiStreaming($Idx$, "
- "context, stream, new_call_cq, notification_cq, tag);\n");
- printer->Print("}\n\n");
- }
-}
-
void PrintSourceService(grpc::protobuf::io::Printer *printer,
const grpc::protobuf::ServiceDescriptor *service,
std::map<grpc::string, grpc::string> *vars) {
@@ -1006,13 +1013,6 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
PrintSourceClientMethod(printer, service->method(i), vars);
}
- (*vars)["MethodCount"] = as_string(service->method_count());
- printer->Print(*vars,
- "$ns$$Service$::AsyncService::AsyncService() : "
- "::grpc::AsynchronousService("
- "$prefix$$Service$_method_names, $MethodCount$) "
- "{}\n\n");
-
printer->Print(*vars,
"$ns$$Service$::Service::Service() {\n"
"}\n\n");
@@ -1022,15 +1022,9 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
for (int i = 0; i < service->method_count(); ++i) {
(*vars)["Idx"] = as_string(i);
PrintSourceServerMethod(printer, service->method(i), vars);
- PrintSourceServerAsyncMethod(printer, service->method(i), vars);
}
- printer->Print(*vars,
- "::grpc::RpcService* $ns$$Service$::Service::service() {\n");
- printer->Indent();
- printer->Print(
- "if (service_) {\n"
- " return service_.get();\n"
- "}\n");
+
+#if 0
printer->Print("service_ = std::unique_ptr< ::grpc::RpcService>(new ::grpc::RpcService());\n");
for (int i = 0; i < service->method_count(); ++i) {
const grpc::protobuf::MethodDescriptor *method = service->method(i);
@@ -1082,6 +1076,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
printer->Print("return service_.get();\n");
printer->Outdent();
printer->Print("}\n\n");
+#endif
}
grpc::string GetSourceServices(const grpc::protobuf::FileDescriptor *file,
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 878775bbee..898f68f104 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -40,6 +40,7 @@
#include <grpc/support/log.h>
#include <grpc++/completion_queue.h>
#include <grpc++/generic/async_generic_service.h>
+#include <grpc++/impl/method_handler_impl.h>
#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/server_context.h>
@@ -314,36 +315,28 @@ void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
g_callbacks = callbacks;
}
-bool Server::RegisterService(const grpc::string* host, RpcService* service) {
- for (int i = 0; i < service->GetMethodCount(); ++i) {
- RpcServiceMethod* method = service->GetMethod(i);
+bool Server::RegisterService(const grpc::string* host, Service* service) {
+ bool has_async_methods = service->has_async_methods();
+ if (has_async_methods) {
+ GPR_ASSERT(service->server_ == nullptr &&
+ "Can only register an asynchronous service against one server.");
+ service->server_ = this;
+ }
+ for (auto it = service->methods_.begin(); it != service->methods_.end();
+ ++it) {
+ RpcServiceMethod* method = it->get();
void* tag = grpc_server_register_method(server_, method->name(),
host ? host->c_str() : nullptr);
- if (!tag) {
+ if (tag == nullptr) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
return false;
}
- sync_methods_->emplace_back(method, tag);
- }
- return true;
-}
-
-bool Server::RegisterAsyncService(const grpc::string* host,
- AsynchronousService* service) {
- GPR_ASSERT(service->server_ == nullptr &&
- "Can only register an asynchronous service against one server.");
- service->server_ = this;
- service->request_args_ = new void* [service->method_count_];
- for (size_t i = 0; i < service->method_count_; ++i) {
- void* tag = grpc_server_register_method(server_, service->method_names_[i],
- host ? host->c_str() : nullptr);
- if (!tag) {
- gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
- service->method_names_[i]);
- return false;
+ if (method->handler() == nullptr) {
+ method->set_server_tag(tag);
+ } else {
+ sync_methods_->emplace_back(method, tag);
}
- service->request_args_[i] = tag;
}
return true;
}
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 26c0724a30..ca82953142 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -43,7 +43,7 @@
namespace grpc {
ServerBuilder::ServerBuilder()
- : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {
+ : max_message_size_(-1), generic_service_(nullptr) {
grpc_compression_options_init(&compression_options_);
}
@@ -53,24 +53,13 @@ std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
return std::unique_ptr<ServerCompletionQueue>(cq);
}
-void ServerBuilder::RegisterService(SynchronousService* service) {
- services_.emplace_back(new NamedService<RpcService>(service->service()));
-}
-
-void ServerBuilder::RegisterAsyncService(AsynchronousService* service) {
- async_services_.emplace_back(new NamedService<AsynchronousService>(service));
+void ServerBuilder::RegisterService(Service* service) {
+ services_.emplace_back(new NamedService(service));
}
void ServerBuilder::RegisterService(const grpc::string& addr,
- SynchronousService* service) {
- services_.emplace_back(
- new NamedService<RpcService>(addr, service->service()));
-}
-
-void ServerBuilder::RegisterAsyncService(const grpc::string& addr,
- AsynchronousService* service) {
- async_services_.emplace_back(
- new NamedService<AsynchronousService>(addr, service));
+ Service* service) {
+ services_.emplace_back(new NamedService(addr, service));
}
void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) {
@@ -96,14 +85,13 @@ void ServerBuilder::AddListeningPort(const grpc::string& addr,
}
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
- bool thread_pool_owned = false;
- if (!async_services_.empty() && !services_.empty()) {
- gpr_log(GPR_ERROR, "Mixing async and sync services is unsupported for now");
- return nullptr;
- }
- if (!thread_pool_ && !services_.empty()) {
- thread_pool_ = CreateDefaultThreadPool();
- thread_pool_owned = true;
+ std::unique_ptr<ThreadPoolInterface> thread_pool;
+ for (auto it = services_.begin(); it != services_.end(); ++it) {
+ if ((*it)->service->has_synchronous_methods()) {
+ if (thread_pool == nullptr && !services_.empty()) {
+ thread_pool.reset(CreateDefaultThreadPool());
+ }
+ }
}
ChannelArguments args;
for (auto option = options_.begin(); option != options_.end(); ++option) {
@@ -115,7 +103,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
args.SetInt(GRPC_COMPRESSION_ALGORITHM_STATE_ARG,
compression_options_.enabled_algorithms_bitset);
std::unique_ptr<Server> server(
- new Server(thread_pool_, thread_pool_owned, max_message_size_, args));
+ new Server(thread_pool.release(), true, max_message_size_, args));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
nullptr);
@@ -126,13 +114,6 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
return nullptr;
}
}
- for (auto service = async_services_.begin(); service != async_services_.end();
- service++) {
- if (!server->RegisterAsyncService((*service)->host.get(),
- (*service)->service)) {
- return nullptr;
- }
- }
if (generic_service_) {
server->RegisterAsyncGenericService(generic_service_);
}