aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpc++/impl/method_handler_impl.h203
-rw-r--r--include/grpc++/impl/rpc_service_method.h189
-rw-r--r--include/grpc++/impl/service_type.h91
-rw-r--r--include/grpc++/server.h27
-rw-r--r--include/grpc++/server_builder.h35
-rw-r--r--src/compiler/cpp_generator.cc227
-rw-r--r--src/cpp/server/server.cc39
-rw-r--r--src/cpp/server/server_builder.cc46
-rw-r--r--test/cpp/end2end/async_end2end_test.cc12
-rw-r--r--test/cpp/qps/client_async.cc2
-rw-r--r--test/cpp/qps/server_async.cc2
11 files changed, 430 insertions, 443 deletions
diff --git a/include/grpc++/impl/method_handler_impl.h b/include/grpc++/impl/method_handler_impl.h
new file mode 100644
index 0000000000..8f7121b915
--- /dev/null
+++ b/include/grpc++/impl/method_handler_impl.h
@@ -0,0 +1,203 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPCXX_IMPL_METHOD_HANDLER_IMPL_H
+#define GRPCXX_IMPL_METHOD_HANDLER_IMPL_H
+
+#include <grpc++/impl/rpc_service_method.h>
+#include <grpc++/support/sync_stream.h>
+
+namespace grpc {
+
+// A wrapper class of an application provided rpc method handler.
+template <class ServiceType, class RequestType, class ResponseType>
+class RpcMethodHandler : public MethodHandler {
+ public:
+ RpcMethodHandler(
+ std::function<Status(ServiceType*, ServerContext*, const RequestType*,
+ ResponseType*)> func,
+ ServiceType* service)
+ : func_(func), service_(service) {}
+
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ RequestType req;
+ Status status = SerializationTraits<RequestType>::Deserialize(
+ param.request, &req, param.max_message_size);
+ ResponseType rsp;
+ if (status.ok()) {
+ status = func_(service_, param.server_context, &req, &rsp);
+ }
+
+ GPR_ASSERT(!param.server_context->sent_initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus> ops;
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ if (status.ok()) {
+ status = ops.SendMessage(rsp);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
+ }
+
+ private:
+ // Application provided rpc handler function.
+ std::function<Status(ServiceType*, ServerContext*, const RequestType*,
+ ResponseType*)> func_;
+ // The class the above handler function lives in.
+ ServiceType* service_;
+};
+
+// A wrapper class of an application provided client streaming handler.
+template <class ServiceType, class RequestType, class ResponseType>
+class ClientStreamingHandler : public MethodHandler {
+ public:
+ ClientStreamingHandler(
+ std::function<Status(ServiceType*, ServerContext*,
+ ServerReader<RequestType>*, ResponseType*)> func,
+ ServiceType* service)
+ : func_(func), service_(service) {}
+
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ ServerReader<RequestType> reader(param.call, param.server_context);
+ ResponseType rsp;
+ Status status = func_(service_, param.server_context, &reader, &rsp);
+
+ GPR_ASSERT(!param.server_context->sent_initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus> ops;
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ if (status.ok()) {
+ status = ops.SendMessage(rsp);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
+ }
+
+ private:
+ std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*,
+ ResponseType*)> func_;
+ ServiceType* service_;
+};
+
+// A wrapper class of an application provided server streaming handler.
+template <class ServiceType, class RequestType, class ResponseType>
+class ServerStreamingHandler : public MethodHandler {
+ public:
+ ServerStreamingHandler(
+ std::function<Status(ServiceType*, ServerContext*, const RequestType*,
+ ServerWriter<ResponseType>*)> func,
+ ServiceType* service)
+ : func_(func), service_(service) {}
+
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ RequestType req;
+ Status status = SerializationTraits<RequestType>::Deserialize(
+ param.request, &req, param.max_message_size);
+
+ if (status.ok()) {
+ ServerWriter<ResponseType> writer(param.call, param.server_context);
+ status = func_(service_, param.server_context, &req, &writer);
+ }
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
+ if (!param.server_context->sent_initial_metadata_) {
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
+ }
+
+ private:
+ std::function<Status(ServiceType*, ServerContext*, const RequestType*,
+ ServerWriter<ResponseType>*)> func_;
+ ServiceType* service_;
+};
+
+// A wrapper class of an application provided bidi-streaming handler.
+template <class ServiceType, class RequestType, class ResponseType>
+class BidiStreamingHandler : public MethodHandler {
+ public:
+ BidiStreamingHandler(
+ std::function<Status(ServiceType*, ServerContext*,
+ ServerReaderWriter<ResponseType, RequestType>*)>
+ func,
+ ServiceType* service)
+ : func_(func), service_(service) {}
+
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ ServerReaderWriter<ResponseType, RequestType> stream(param.call,
+ param.server_context);
+ Status status = func_(service_, param.server_context, &stream);
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
+ if (!param.server_context->sent_initial_metadata_) {
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
+ }
+
+ private:
+ std::function<Status(ServiceType*, ServerContext*,
+ ServerReaderWriter<ResponseType, RequestType>*)> func_;
+ ServiceType* service_;
+};
+
+// Handle unknown method by returning UNIMPLEMENTED error.
+class UnknownMethodHandler : public MethodHandler {
+ public:
+ template <class T>
+ static void FillOps(ServerContext* context, T* ops) {
+ Status status(StatusCode::UNIMPLEMENTED, "");
+ if (!context->sent_initial_metadata_) {
+ ops->SendInitialMetadata(context->initial_metadata_);
+ context->sent_initial_metadata_ = true;
+ }
+ ops->ServerSendStatus(context->trailing_metadata_, status);
+ }
+
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
+ FillOps(param.server_context, &ops);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
+ }
+};
+
+} // namespace grpc
+
+#endif // GRPCXX_IMPL_METHOD_HANDLER_IMPL_H \ No newline at end of file
diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h
index b203c8f53a..3b47a4d64d 100644
--- a/include/grpc++/impl/rpc_service_method.h
+++ b/include/grpc++/impl/rpc_service_method.h
@@ -43,14 +43,11 @@
#include <grpc++/impl/rpc_method.h>
#include <grpc++/support/config.h>
#include <grpc++/support/status.h>
-#include <grpc++/support/sync_stream.h>
namespace grpc {
class ServerContext;
class StreamContextInterface;
-// TODO(rocking): we might need to split this file into multiple ones.
-
// Base class for running an RPC handler.
class MethodHandler {
public:
@@ -71,197 +68,25 @@ class MethodHandler {
virtual void RunHandler(const HandlerParameter& param) = 0;
};
-// A wrapper class of an application provided rpc method handler.
-template <class ServiceType, class RequestType, class ResponseType>
-class RpcMethodHandler : public MethodHandler {
- public:
- RpcMethodHandler(
- std::function<Status(ServiceType*, ServerContext*, const RequestType*,
- ResponseType*)> func,
- ServiceType* service)
- : func_(func), service_(service) {}
-
- void RunHandler(const HandlerParameter& param) GRPC_FINAL {
- RequestType req;
- Status status = SerializationTraits<RequestType>::Deserialize(
- param.request, &req, param.max_message_size);
- ResponseType rsp;
- if (status.ok()) {
- status = func_(service_, param.server_context, &req, &rsp);
- }
-
- GPR_ASSERT(!param.server_context->sent_initial_metadata_);
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpServerSendStatus> ops;
- ops.SendInitialMetadata(param.server_context->initial_metadata_);
- if (status.ok()) {
- status = ops.SendMessage(rsp);
- }
- ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
- param.call->PerformOps(&ops);
- param.call->cq()->Pluck(&ops);
- }
-
- private:
- // Application provided rpc handler function.
- std::function<Status(ServiceType*, ServerContext*, const RequestType*,
- ResponseType*)> func_;
- // The class the above handler function lives in.
- ServiceType* service_;
-};
-
-// A wrapper class of an application provided client streaming handler.
-template <class ServiceType, class RequestType, class ResponseType>
-class ClientStreamingHandler : public MethodHandler {
- public:
- ClientStreamingHandler(
- std::function<Status(ServiceType*, ServerContext*,
- ServerReader<RequestType>*, ResponseType*)> func,
- ServiceType* service)
- : func_(func), service_(service) {}
-
- void RunHandler(const HandlerParameter& param) GRPC_FINAL {
- ServerReader<RequestType> reader(param.call, param.server_context);
- ResponseType rsp;
- Status status = func_(service_, param.server_context, &reader, &rsp);
-
- GPR_ASSERT(!param.server_context->sent_initial_metadata_);
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpServerSendStatus> ops;
- ops.SendInitialMetadata(param.server_context->initial_metadata_);
- if (status.ok()) {
- status = ops.SendMessage(rsp);
- }
- ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
- param.call->PerformOps(&ops);
- param.call->cq()->Pluck(&ops);
- }
-
- private:
- std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*,
- ResponseType*)> func_;
- ServiceType* service_;
-};
-
-// A wrapper class of an application provided server streaming handler.
-template <class ServiceType, class RequestType, class ResponseType>
-class ServerStreamingHandler : public MethodHandler {
- public:
- ServerStreamingHandler(
- std::function<Status(ServiceType*, ServerContext*, const RequestType*,
- ServerWriter<ResponseType>*)> func,
- ServiceType* service)
- : func_(func), service_(service) {}
-
- void RunHandler(const HandlerParameter& param) GRPC_FINAL {
- RequestType req;
- Status status = SerializationTraits<RequestType>::Deserialize(
- param.request, &req, param.max_message_size);
-
- if (status.ok()) {
- ServerWriter<ResponseType> writer(param.call, param.server_context);
- status = func_(service_, param.server_context, &req, &writer);
- }
-
- CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
- if (!param.server_context->sent_initial_metadata_) {
- ops.SendInitialMetadata(param.server_context->initial_metadata_);
- }
- ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
- param.call->PerformOps(&ops);
- param.call->cq()->Pluck(&ops);
- }
-
- private:
- std::function<Status(ServiceType*, ServerContext*, const RequestType*,
- ServerWriter<ResponseType>*)> func_;
- ServiceType* service_;
-};
-
-// A wrapper class of an application provided bidi-streaming handler.
-template <class ServiceType, class RequestType, class ResponseType>
-class BidiStreamingHandler : public MethodHandler {
- public:
- BidiStreamingHandler(
- std::function<Status(ServiceType*, ServerContext*,
- ServerReaderWriter<ResponseType, RequestType>*)>
- func,
- ServiceType* service)
- : func_(func), service_(service) {}
-
- void RunHandler(const HandlerParameter& param) GRPC_FINAL {
- ServerReaderWriter<ResponseType, RequestType> stream(param.call,
- param.server_context);
- Status status = func_(service_, param.server_context, &stream);
-
- CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
- if (!param.server_context->sent_initial_metadata_) {
- ops.SendInitialMetadata(param.server_context->initial_metadata_);
- }
- ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
- param.call->PerformOps(&ops);
- param.call->cq()->Pluck(&ops);
- }
-
- private:
- std::function<Status(ServiceType*, ServerContext*,
- ServerReaderWriter<ResponseType, RequestType>*)> func_;
- ServiceType* service_;
-};
-
-// Handle unknown method by returning UNIMPLEMENTED error.
-class UnknownMethodHandler : public MethodHandler {
- public:
- template <class T>
- static void FillOps(ServerContext* context, T* ops) {
- Status status(StatusCode::UNIMPLEMENTED, "");
- if (!context->sent_initial_metadata_) {
- ops->SendInitialMetadata(context->initial_metadata_);
- context->sent_initial_metadata_ = true;
- }
- ops->ServerSendStatus(context->trailing_metadata_, status);
- }
-
- void RunHandler(const HandlerParameter& param) GRPC_FINAL {
- CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
- FillOps(param.server_context, &ops);
- param.call->PerformOps(&ops);
- param.call->cq()->Pluck(&ops);
- }
-};
-
// Server side rpc method class
class RpcServiceMethod : public RpcMethod {
public:
// Takes ownership of the handler
RpcServiceMethod(const char* name, RpcMethod::RpcType type,
MethodHandler* handler)
- : RpcMethod(name, type), handler_(handler) {}
+ : RpcMethod(name, type), server_tag_(nullptr), handler_(handler) {}
- MethodHandler* handler() { return handler_.get(); }
+ void set_server_tag(void* tag) { server_tag_ = tag; }
+ void* server_tag() const { return server_tag_; }
+ // if MethodHandler is nullptr, then this is an async method
+ MethodHandler* handler() const { return handler_.get(); }
+ void ResetHandler() { handler_.reset(); }
private:
+ void* server_tag_;
std::unique_ptr<MethodHandler> handler_;
};
-// This class contains all the method information for an rpc service. It is
-// used for registering a service on a grpc server.
-class RpcService {
- public:
- // Takes ownership.
- void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); }
-
- RpcServiceMethod* GetMethod(int i) { return methods_[i].get(); }
- int GetMethodCount() const {
- // On win x64, int is only 32bit
- GPR_ASSERT(methods_.size() <= INT_MAX);
- return (int)methods_.size();
- }
-
- private:
- std::vector<std::unique_ptr<RpcServiceMethod>> methods_;
-};
-
} // namespace grpc
#endif // GRPCXX_IMPL_RPC_SERVICE_METHOD_H
diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h
index 3b6ac1de77..655aa91cdc 100644
--- a/include/grpc++/impl/service_type.h
+++ b/include/grpc++/impl/service_type.h
@@ -34,6 +34,7 @@
#ifndef GRPCXX_IMPL_SERVICE_TYPE_H
#define GRPCXX_IMPL_SERVICE_TYPE_H
+#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/impl/serialization_traits.h>
#include <grpc++/server.h>
#include <grpc++/support/config.h>
@@ -43,17 +44,10 @@ namespace grpc {
class Call;
class CompletionQueue;
-class RpcService;
class Server;
class ServerCompletionQueue;
class ServerContext;
-class SynchronousService {
- public:
- virtual ~SynchronousService() {}
- virtual RpcService* service() = 0;
-};
-
class ServerAsyncStreamingInterface {
public:
virtual ~ServerAsyncStreamingInterface() {}
@@ -65,15 +59,28 @@ class ServerAsyncStreamingInterface {
virtual void BindCall(Call* call) = 0;
};
-class AsynchronousService {
+class Service {
public:
- AsynchronousService(const char** method_names, size_t method_count)
- : server_(nullptr),
- method_names_(method_names),
- method_count_(method_count),
- request_args_(nullptr) {}
+ Service() : server_(nullptr) {}
+ virtual ~Service() {}
+
+ bool has_async_methods() const {
+ for (auto it = methods_.begin(); it != methods_.end(); ++it) {
+ if ((*it)->handler() == nullptr) {
+ return true;
+ }
+ }
+ return false;
+ }
- ~AsynchronousService() { delete[] request_args_; }
+ bool has_synchronous_methods() const {
+ for (auto it = methods_.begin(); it != methods_.end(); ++it) {
+ if ((*it)->handler() != nullptr) {
+ return true;
+ }
+ }
+ return false;
+ }
protected:
template <class Message>
@@ -81,41 +88,53 @@ class AsynchronousService {
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
- server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
+ server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq,
notification_cq, tag, request);
}
- void RequestClientStreaming(int index, ServerContext* context,
- ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) {
- server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
+ void RequestAsyncClientStreaming(int index, ServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) {
+ server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq,
notification_cq, tag);
}
template <class Message>
- void RequestServerStreaming(int index, ServerContext* context,
- Message* request,
- ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) {
- server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
+ void RequestAsyncServerStreaming(int index, ServerContext* context,
+ Message* request,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) {
+ server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq,
notification_cq, tag, request);
}
- void RequestBidiStreaming(int index, ServerContext* context,
- ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq, void* tag) {
- server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
+ void RequestAsyncBidiStreaming(int index, ServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) {
+ server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq,
notification_cq, tag);
}
+ void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); }
+
+ void MarkMethodAsync(const grpc::string& method_name) {
+ for (auto it = methods_.begin(); it != methods_.end(); ++it) {
+ if ((*it)->name() == method_name) {
+ (*it)->ResetHandler();
+ return;
+ }
+ }
+ abort();
+ }
+
private:
friend class Server;
+
Server* server_;
- const char** const method_names_;
- size_t method_count_;
- void** request_args_;
+ std::vector<std::unique_ptr<RpcServiceMethod>> methods_;
};
} // namespace grpc
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 644e66e6e0..92d7a4b3cc 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -40,6 +40,7 @@
#include <grpc++/completion_queue.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/grpc_library.h>
+#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/impl/sync.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/support/channel_arguments.h>
@@ -51,13 +52,11 @@ struct grpc_server;
namespace grpc {
-class AsynchronousService;
class GenericServerContext;
class AsyncGenericService;
-class RpcService;
-class RpcServiceMethod;
class ServerAsyncStreamingInterface;
class ServerContext;
+class Service;
class ThreadPoolInterface;
/// Models a gRPC server.
@@ -105,7 +104,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
private:
friend class AsyncGenericService;
- friend class AsynchronousService;
+ friend class Service;
friend class ServerBuilder;
class SyncRequest;
@@ -123,12 +122,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance.
- bool RegisterService(const grpc::string* host, RpcService* service);
-
- /// Register an asynchronous service. This call does not take ownership of the
- /// service. The service must exist for the lifetime of the Server instance.
- bool RegisterAsyncService(const grpc::string* host,
- AsynchronousService* service);
+ bool RegisterService(const grpc::string* host, Service* service);
/// Register a generic service. This call does not take ownership of the
/// service. The service must exist for the lifetime of the Server instance.
@@ -265,21 +259,22 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
class UnimplementedAsyncResponse;
template <class Message>
- void RequestAsyncCall(void* registered_method, ServerContext* context,
+ void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* message) {
- new PayloadAsyncRequest<Message>(registered_method, this, context, stream,
- call_cq, notification_cq, tag, message);
+ new PayloadAsyncRequest<Message>(method->server_tag(), this, context,
+ stream, call_cq, notification_cq, tag,
+ message);
}
- void RequestAsyncCall(void* registered_method, ServerContext* context,
+ void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
- new NoPayloadAsyncRequest(registered_method, this, context, stream, call_cq,
- notification_cq, tag);
+ new NoPayloadAsyncRequest(method->server_tag(), this, context, stream,
+ call_cq, notification_cq, tag);
}
void RequestAsyncGenericCall(GenericServerContext* context,
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index b324deb9e0..86c7fecef5 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -44,14 +44,12 @@
namespace grpc {
class AsyncGenericService;
-class AsynchronousService;
class CompletionQueue;
class RpcService;
class Server;
class ServerCompletionQueue;
class ServerCredentials;
-class SynchronousService;
-class ThreadPoolInterface;
+class Service;
/// A builder class for the creation and startup of \a grpc::Server instances.
class ServerBuilder {
@@ -62,14 +60,7 @@ class ServerBuilder {
/// The service must exist for the lifetime of the \a Server instance returned
/// by \a BuildAndStart().
/// Matches requests with any :authority
- void RegisterService(SynchronousService* service);
-
- /// Register an asynchronous service.
- /// This call does not take ownership of the service or completion queue.
- /// The service and completion queuemust exist for the lifetime of the \a
- /// Server instance returned by \a BuildAndStart().
- /// Matches requests with any :authority
- void RegisterAsyncService(AsynchronousService* service);
+ void RegisterService(Service* service);
/// Register a generic service.
/// Matches requests with any :authority
@@ -79,15 +70,7 @@ class ServerBuilder {
/// The service must exist for the lifetime of the \a Server instance returned
/// by BuildAndStart().
/// Only matches requests with :authority \a host
- void RegisterService(const grpc::string& host, SynchronousService* service);
-
- /// Register an asynchronous service.
- /// This call does not take ownership of the service or completion queue.
- /// The service and completion queuemust exist for the lifetime of the \a
- /// Server instance returned by \a BuildAndStart().
- /// Only matches requests with :authority equal to \a host
- void RegisterAsyncService(const grpc::string& host,
- AsynchronousService* service);
+ void RegisterService(const grpc::string& host, Service* service);
/// Set max message size in bytes.
void SetMaxMessageSize(int max_message_size) {
@@ -132,26 +115,22 @@ class ServerBuilder {
};
typedef std::unique_ptr<grpc::string> HostString;
- template <class T>
struct NamedService {
- explicit NamedService(T* s) : service(s) {}
- NamedService(const grpc::string& h, T* s)
+ explicit NamedService(Service* s) : service(s) {}
+ NamedService(const grpc::string& h, Service* s)
: host(new grpc::string(h)), service(s) {}
HostString host;
- T* service;
+ Service* service;
};
int max_message_size_;
grpc_compression_options compression_options_;
std::vector<std::unique_ptr<ServerBuilderOption>> options_;
- std::vector<std::unique_ptr<NamedService<RpcService>>> services_;
- std::vector<std::unique_ptr<NamedService<AsynchronousService>>>
- async_services_;
+ std::vector<std::unique_ptr<NamedService>> services_;
std::vector<Port> ports_;
std::vector<ServerCompletionQueue*> cqs_;
std::shared_ptr<ServerCredentials> creds_;
AsyncGenericService* generic_service_;
- ThreadPoolInterface* thread_pool_;
};
} // namespace grpc
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index 3c8ca8ab45..9d0d7eb469 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -491,39 +491,114 @@ void PrintHeaderServerMethodAsync(
grpc_cpp_generator::ClassName(method->input_type(), true);
(*vars)["Response"] =
grpc_cpp_generator::ClassName(method->output_type(), true);
+ printer->Print(*vars, "template <class BaseClass>\n");
+ printer->Print(*vars,
+ "class WithAsyncMethod_$Method$ : public BaseClass {\n");
+ printer->Print(
+ " private:\n"
+ " void BaseClassMustBeDerivedFromService(Service *service) {}\n");
+ printer->Print(" public:\n");
+ printer->Indent();
+ printer->Print(*vars,
+ "WithAsyncMethod_$Method$() {\n"
+ " ::grpc::Service::MarkMethodAsync("
+ "\"/$Package$$Service$/$Method$\");\n"
+ "}\n");
+ printer->Print(*vars,
+ "~WithAsyncMethod_$Method$() GRPC_OVERRIDE {\n"
+ " BaseClassMustBeDerivedFromService(this);\n"
+ "}\n");
if (NoStreaming(method)) {
printer->Print(
*vars,
+ "// disable synchronous version of this method\n"
+ "::grpc::Status $Method$("
+ "::grpc::ServerContext* context, const $Request$* request, "
+ "$Response$* response) GRPC_FINAL GRPC_OVERRIDE {\n"
+ " abort();\n"
+ " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
+ "}\n");
+ printer->Print(
+ *vars,
"void Request$Method$("
"::grpc::ServerContext* context, $Request$* request, "
"::grpc::ServerAsyncResponseWriter< $Response$>* response, "
"::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
+ printer->Print(*vars,
+ " ::grpc::Service::RequestAsyncUnary($Idx$, context, "
+ "request, response, new_call_cq, notification_cq, tag);\n");
+ printer->Print("}\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
+ "// disable synchronous version of this method\n"
+ "::grpc::Status $Method$("
+ "::grpc::ServerContext* context, "
+ "::grpc::ServerReader< $Request$>* reader, "
+ "$Response$* response) GRPC_FINAL GRPC_OVERRIDE {\n"
+ " abort();\n"
+ " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
+ "}\n");
+ printer->Print(
+ *vars,
"void Request$Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerAsyncReader< $Response$, $Request$>* reader, "
"::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
+ printer->Print(*vars,
+ " ::grpc::Service::RequestAsyncClientStreaming($Idx$, "
+ "context, reader, new_call_cq, notification_cq, tag);\n");
+ printer->Print("}\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
+ "// disable synchronous version of this method\n"
+ "::grpc::Status $Method$("
+ "::grpc::ServerContext* context, const $Request$* request, "
+ "::grpc::ServerWriter< $Response$>* writer) GRPC_FINAL GRPC_OVERRIDE "
+ "{\n"
+ " abort();\n"
+ " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
+ "}\n");
+ printer->Print(
+ *vars,
"void Request$Method$("
"::grpc::ServerContext* context, $Request$* request, "
"::grpc::ServerAsyncWriter< $Response$>* writer, "
"::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
+ printer->Print(
+ *vars,
+ " ::grpc::Service::RequestAsyncServerStreaming($Idx$, "
+ "context, request, writer, new_call_cq, notification_cq, tag);\n");
+ printer->Print("}\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
+ "// disable synchronous version of this method\n"
+ "::grpc::Status $Method$("
+ "::grpc::ServerContext* context, "
+ "::grpc::ServerReaderWriter< $Response$, $Request$>* stream) "
+ "GRPC_FINAL GRPC_OVERRIDE {\n"
+ " abort();\n"
+ " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
+ "}\n");
+ printer->Print(
+ *vars,
"void Request$Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, "
"::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
+ printer->Print(*vars,
+ " ::grpc::Service::RequestAsyncBidiStreaming($Idx$, "
+ "context, stream, new_call_cq, notification_cq, tag);\n");
+ printer->Print("}\n");
}
+ printer->Outdent();
+ printer->Print(*vars, "};\n");
}
void PrintHeaderService(grpc::protobuf::io::Printer *printer,
@@ -580,9 +655,9 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
printer->Print("\n");
- // Server side - Synchronous
+ // Server side - base
printer->Print(
- "class Service : public ::grpc::SynchronousService {\n"
+ "class Service : public ::grpc::Service {\n"
" public:\n");
printer->Indent();
printer->Print("Service();\n");
@@ -590,26 +665,26 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderServerMethodSync(printer, service->method(i), vars);
}
- printer->Print("::grpc::RpcService* service() GRPC_OVERRIDE GRPC_FINAL;\n");
printer->Outdent();
- printer->Print(
- " private:\n"
- " std::unique_ptr< ::grpc::RpcService> service_;\n");
printer->Print("};\n");
// Server side - Asynchronous
- printer->Print(
- "class AsyncService GRPC_FINAL : public ::grpc::AsynchronousService {\n"
- " public:\n");
- printer->Indent();
- (*vars)["MethodCount"] = as_string(service->method_count());
- printer->Print("explicit AsyncService();\n");
- printer->Print("~AsyncService() {};\n");
for (int i = 0; i < service->method_count(); ++i) {
+ (*vars)["Idx"] = as_string(i);
PrintHeaderServerMethodAsync(printer, service->method(i), vars);
}
- printer->Outdent();
- printer->Print("};\n");
+
+ printer->Print("typedef ");
+
+ for (int i = 0; i < service->method_count(); ++i) {
+ (*vars)["method_name"] = service->method(i)->name();
+ printer->Print(*vars, "WithAsyncMethod_$method_name$<");
+ }
+ printer->Print("Service");
+ for (int i = 0; i < service->method_count(); ++i) {
+ printer->Print(" >");
+ }
+ printer->Print(" AsyncService;\n");
printer->Outdent();
printer->Print("};\n");
@@ -623,6 +698,12 @@ grpc::string GetHeaderServices(const grpc::protobuf::FileDescriptor *file,
grpc::protobuf::io::StringOutputStream output_stream(&output);
grpc::protobuf::io::Printer printer(&output_stream, '$');
std::map<grpc::string, grpc::string> vars;
+ // Package string is empty or ends with a dot. It is used to fully qualify
+ // method names.
+ vars["Package"] = file->package();
+ if (!file->package().empty()) {
+ vars["Package"].append(".");
+ }
if (!params.services_namespace.empty()) {
vars["services_namespace"] = params.services_namespace;
@@ -704,6 +785,7 @@ grpc::string GetSourceIncludes(const grpc::protobuf::FileDescriptor *file,
printer.Print(vars, "#include <grpc++/channel.h>\n");
printer.Print(vars, "#include <grpc++/impl/client_unary_call.h>\n");
+ printer.Print(vars, "#include <grpc++/impl/method_handler_impl.h>\n");
printer.Print(vars, "#include <grpc++/impl/rpc_service_method.h>\n");
printer.Print(vars, "#include <grpc++/impl/service_type.h>\n");
printer.Print(vars, "#include <grpc++/support/async_unary_call.h>\n");
@@ -889,69 +971,6 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer,
}
}
-void PrintSourceServerAsyncMethod(
- grpc::protobuf::io::Printer *printer,
- const grpc::protobuf::MethodDescriptor *method,
- std::map<grpc::string, grpc::string> *vars) {
- (*vars)["Method"] = method->name();
- (*vars)["Request"] =
- grpc_cpp_generator::ClassName(method->input_type(), true);
- (*vars)["Response"] =
- grpc_cpp_generator::ClassName(method->output_type(), true);
- if (NoStreaming(method)) {
- printer->Print(
- *vars,
- "void $ns$$Service$::AsyncService::Request$Method$("
- "::grpc::ServerContext* context, "
- "$Request$* request, "
- "::grpc::ServerAsyncResponseWriter< $Response$>* response, "
- "::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
- printer->Print(*vars,
- " AsynchronousService::RequestAsyncUnary($Idx$, context, "
- "request, response, new_call_cq, notification_cq, tag);\n");
- printer->Print("}\n\n");
- } else if (ClientOnlyStreaming(method)) {
- printer->Print(
- *vars,
- "void $ns$$Service$::AsyncService::Request$Method$("
- "::grpc::ServerContext* context, "
- "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, "
- "::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
- printer->Print(*vars,
- " AsynchronousService::RequestClientStreaming($Idx$, "
- "context, reader, new_call_cq, notification_cq, tag);\n");
- printer->Print("}\n\n");
- } else if (ServerOnlyStreaming(method)) {
- printer->Print(
- *vars,
- "void $ns$$Service$::AsyncService::Request$Method$("
- "::grpc::ServerContext* context, "
- "$Request$* request, "
- "::grpc::ServerAsyncWriter< $Response$>* writer, "
- "::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
- printer->Print(
- *vars,
- " AsynchronousService::RequestServerStreaming($Idx$, "
- "context, request, writer, new_call_cq, notification_cq, tag);\n");
- printer->Print("}\n\n");
- } else if (BidiStreaming(method)) {
- printer->Print(
- *vars,
- "void $ns$$Service$::AsyncService::Request$Method$("
- "::grpc::ServerContext* context, "
- "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, "
- "::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
- printer->Print(*vars,
- " AsynchronousService::RequestBidiStreaming($Idx$, "
- "context, stream, new_call_cq, notification_cq, tag);\n");
- printer->Print("}\n\n");
- }
-}
-
void PrintSourceService(grpc::protobuf::io::Printer *printer,
const grpc::protobuf::ServiceDescriptor *service,
std::map<grpc::string, grpc::string> *vars) {
@@ -1006,32 +1025,8 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
PrintSourceClientMethod(printer, service->method(i), vars);
}
- (*vars)["MethodCount"] = as_string(service->method_count());
- printer->Print(*vars,
- "$ns$$Service$::AsyncService::AsyncService() : "
- "::grpc::AsynchronousService("
- "$prefix$$Service$_method_names, $MethodCount$) "
- "{}\n\n");
-
- printer->Print(*vars,
- "$ns$$Service$::Service::Service() {\n"
- "}\n\n");
- printer->Print(*vars,
- "$ns$$Service$::Service::~Service() {\n"
- "}\n\n");
- for (int i = 0; i < service->method_count(); ++i) {
- (*vars)["Idx"] = as_string(i);
- PrintSourceServerMethod(printer, service->method(i), vars);
- PrintSourceServerAsyncMethod(printer, service->method(i), vars);
- }
- printer->Print(*vars,
- "::grpc::RpcService* $ns$$Service$::Service::service() {\n");
+ printer->Print(*vars, "$ns$$Service$::Service::Service() {\n");
printer->Indent();
- printer->Print(
- "if (service_) {\n"
- " return service_.get();\n"
- "}\n");
- printer->Print("service_ = std::unique_ptr< ::grpc::RpcService>(new ::grpc::RpcService());\n");
for (int i = 0; i < service->method_count(); ++i) {
const grpc::protobuf::MethodDescriptor *method = service->method(i);
(*vars)["Idx"] = as_string(i);
@@ -1043,7 +1038,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
if (NoStreaming(method)) {
printer->Print(
*vars,
- "service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::RpcMethod::NORMAL_RPC,\n"
" new ::grpc::RpcMethodHandler< $ns$$Service$::Service, "
@@ -1053,7 +1048,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
- "service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::RpcMethod::CLIENT_STREAMING,\n"
" new ::grpc::ClientStreamingHandler< "
@@ -1062,7 +1057,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
- "service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::RpcMethod::SERVER_STREAMING,\n"
" new ::grpc::ServerStreamingHandler< "
@@ -1071,7 +1066,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
- "service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::RpcMethod::BIDI_STREAMING,\n"
" new ::grpc::BidiStreamingHandler< "
@@ -1079,9 +1074,15 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
}
}
- printer->Print("return service_.get();\n");
printer->Outdent();
- printer->Print("}\n\n");
+ printer->Print(*vars, "}\n\n");
+ printer->Print(*vars,
+ "$ns$$Service$::Service::~Service() {\n"
+ "}\n\n");
+ for (int i = 0; i < service->method_count(); ++i) {
+ (*vars)["Idx"] = as_string(i);
+ PrintSourceServerMethod(printer, service->method(i), vars);
+ }
}
grpc::string GetSourceServices(const grpc::protobuf::FileDescriptor *file,
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 878775bbee..898f68f104 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -40,6 +40,7 @@
#include <grpc/support/log.h>
#include <grpc++/completion_queue.h>
#include <grpc++/generic/async_generic_service.h>
+#include <grpc++/impl/method_handler_impl.h>
#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/server_context.h>
@@ -314,36 +315,28 @@ void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
g_callbacks = callbacks;
}
-bool Server::RegisterService(const grpc::string* host, RpcService* service) {
- for (int i = 0; i < service->GetMethodCount(); ++i) {
- RpcServiceMethod* method = service->GetMethod(i);
+bool Server::RegisterService(const grpc::string* host, Service* service) {
+ bool has_async_methods = service->has_async_methods();
+ if (has_async_methods) {
+ GPR_ASSERT(service->server_ == nullptr &&
+ "Can only register an asynchronous service against one server.");
+ service->server_ = this;
+ }
+ for (auto it = service->methods_.begin(); it != service->methods_.end();
+ ++it) {
+ RpcServiceMethod* method = it->get();
void* tag = grpc_server_register_method(server_, method->name(),
host ? host->c_str() : nullptr);
- if (!tag) {
+ if (tag == nullptr) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
return false;
}
- sync_methods_->emplace_back(method, tag);
- }
- return true;
-}
-
-bool Server::RegisterAsyncService(const grpc::string* host,
- AsynchronousService* service) {
- GPR_ASSERT(service->server_ == nullptr &&
- "Can only register an asynchronous service against one server.");
- service->server_ = this;
- service->request_args_ = new void* [service->method_count_];
- for (size_t i = 0; i < service->method_count_; ++i) {
- void* tag = grpc_server_register_method(server_, service->method_names_[i],
- host ? host->c_str() : nullptr);
- if (!tag) {
- gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
- service->method_names_[i]);
- return false;
+ if (method->handler() == nullptr) {
+ method->set_server_tag(tag);
+ } else {
+ sync_methods_->emplace_back(method, tag);
}
- service->request_args_[i] = tag;
}
return true;
}
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 26c0724a30..bd7dd76b8d 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -43,7 +43,7 @@
namespace grpc {
ServerBuilder::ServerBuilder()
- : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {
+ : max_message_size_(-1), generic_service_(nullptr) {
grpc_compression_options_init(&compression_options_);
}
@@ -53,24 +53,13 @@ std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
return std::unique_ptr<ServerCompletionQueue>(cq);
}
-void ServerBuilder::RegisterService(SynchronousService* service) {
- services_.emplace_back(new NamedService<RpcService>(service->service()));
-}
-
-void ServerBuilder::RegisterAsyncService(AsynchronousService* service) {
- async_services_.emplace_back(new NamedService<AsynchronousService>(service));
+void ServerBuilder::RegisterService(Service* service) {
+ services_.emplace_back(new NamedService(service));
}
void ServerBuilder::RegisterService(const grpc::string& addr,
- SynchronousService* service) {
- services_.emplace_back(
- new NamedService<RpcService>(addr, service->service()));
-}
-
-void ServerBuilder::RegisterAsyncService(const grpc::string& addr,
- AsynchronousService* service) {
- async_services_.emplace_back(
- new NamedService<AsynchronousService>(addr, service));
+ Service* service) {
+ services_.emplace_back(new NamedService(addr, service));
}
void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) {
@@ -96,14 +85,14 @@ void ServerBuilder::AddListeningPort(const grpc::string& addr,
}
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
- bool thread_pool_owned = false;
- if (!async_services_.empty() && !services_.empty()) {
- gpr_log(GPR_ERROR, "Mixing async and sync services is unsupported for now");
- return nullptr;
- }
- if (!thread_pool_ && !services_.empty()) {
- thread_pool_ = CreateDefaultThreadPool();
- thread_pool_owned = true;
+ std::unique_ptr<ThreadPoolInterface> thread_pool;
+ for (auto it = services_.begin(); it != services_.end(); ++it) {
+ if ((*it)->service->has_synchronous_methods()) {
+ if (thread_pool == nullptr) {
+ thread_pool.reset(CreateDefaultThreadPool());
+ break;
+ }
+ }
}
ChannelArguments args;
for (auto option = options_.begin(); option != options_.end(); ++option) {
@@ -115,7 +104,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
args.SetInt(GRPC_COMPRESSION_ALGORITHM_STATE_ARG,
compression_options_.enabled_algorithms_bitset);
std::unique_ptr<Server> server(
- new Server(thread_pool_, thread_pool_owned, max_message_size_, args));
+ new Server(thread_pool.release(), true, max_message_size_, args));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
nullptr);
@@ -126,13 +115,6 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
return nullptr;
}
}
- for (auto service = async_services_.begin(); service != async_services_.end();
- service++) {
- if (!server->RegisterAsyncService((*service)->host.get(),
- (*service)->service)) {
- return nullptr;
- }
- }
if (generic_service_) {
server->RegisterAsyncGenericService(generic_service_);
}
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 62023b24fd..320a064592 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -180,21 +180,11 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
- // It is currently unsupported to mix sync and async services
- // in the same server, so first test that (for coverage)
- ServerBuilder build_bad;
- build_bad.AddListeningPort(server_address_.str(),
- grpc::InsecureServerCredentials());
- build_bad.RegisterAsyncService(&service_);
- grpc::testing::TestService::Service sync_service;
- build_bad.RegisterService(&sync_service);
- GPR_ASSERT(build_bad.BuildAndStart() == nullptr);
-
// Setup server
ServerBuilder builder;
builder.AddListeningPort(server_address_.str(),
grpc::InsecureServerCredentials());
- builder.RegisterAsyncService(&service_);
+ builder.RegisterService(&service_);
cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
}
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 3e2317c6d4..47e902cc5d 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -49,10 +49,10 @@
#include <grpc/support/histogram.h>
#include <grpc/support/log.h>
+#include "src/proto/grpc/testing/services.grpc.pb.h"
#include "test/cpp/qps/client.h"
#include "test/cpp/qps/timer.h"
#include "test/cpp/util/create_test_channel.h"
-#include "src/proto/grpc/testing/services.grpc.pb.h"
namespace grpc {
namespace testing {
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 1ae88d7323..50be679caf 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -350,7 +350,7 @@ class AsyncQpsServerTest : public Server {
static void RegisterBenchmarkService(ServerBuilder *builder,
BenchmarkService::AsyncService *service) {
- builder->RegisterAsyncService(service);
+ builder->RegisterService(service);
}
static void RegisterGenericService(ServerBuilder *builder,
grpc::AsyncGenericService *service) {