From 15f383c6cc74e94cdc7223552824135acab00e7f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 7 Jan 2016 12:45:32 -0800 Subject: Exploratory refactoring for mixed sync/async methods on the same C++ service --- include/grpc++/impl/method_handler_impl.h | 203 ++++++++++++++++++++++++++++++ include/grpc++/impl/rpc_service_method.h | 186 +-------------------------- include/grpc++/impl/service_type.h | 78 ++++++------ 3 files changed, 251 insertions(+), 216 deletions(-) create mode 100644 include/grpc++/impl/method_handler_impl.h (limited to 'include/grpc++/impl') diff --git a/include/grpc++/impl/method_handler_impl.h b/include/grpc++/impl/method_handler_impl.h new file mode 100644 index 0000000000..8f7121b915 --- /dev/null +++ b/include/grpc++/impl/method_handler_impl.h @@ -0,0 +1,203 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_IMPL_METHOD_HANDLER_IMPL_H +#define GRPCXX_IMPL_METHOD_HANDLER_IMPL_H + +#include +#include + +namespace grpc { + +// A wrapper class of an application provided rpc method handler. +template +class RpcMethodHandler : public MethodHandler { + public: + RpcMethodHandler( + std::function func, + ServiceType* service) + : func_(func), service_(service) {} + + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + RequestType req; + Status status = SerializationTraits::Deserialize( + param.request, &req, param.max_message_size); + ResponseType rsp; + if (status.ok()) { + status = func_(service_, param.server_context, &req, &rsp); + } + + GPR_ASSERT(!param.server_context->sent_initial_metadata_); + CallOpSet ops; + ops.SendInitialMetadata(param.server_context->initial_metadata_); + if (status.ok()) { + status = ops.SendMessage(rsp); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); + } + + private: + // Application provided rpc handler function. + std::function func_; + // The class the above handler function lives in. + ServiceType* service_; +}; + +// A wrapper class of an application provided client streaming handler. +template +class ClientStreamingHandler : public MethodHandler { + public: + ClientStreamingHandler( + std::function*, ResponseType*)> func, + ServiceType* service) + : func_(func), service_(service) {} + + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + ServerReader reader(param.call, param.server_context); + ResponseType rsp; + Status status = func_(service_, param.server_context, &reader, &rsp); + + GPR_ASSERT(!param.server_context->sent_initial_metadata_); + CallOpSet ops; + ops.SendInitialMetadata(param.server_context->initial_metadata_); + if (status.ok()) { + status = ops.SendMessage(rsp); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); + } + + private: + std::function*, + ResponseType*)> func_; + ServiceType* service_; +}; + +// A wrapper class of an application provided server streaming handler. +template +class ServerStreamingHandler : public MethodHandler { + public: + ServerStreamingHandler( + std::function*)> func, + ServiceType* service) + : func_(func), service_(service) {} + + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + RequestType req; + Status status = SerializationTraits::Deserialize( + param.request, &req, param.max_message_size); + + if (status.ok()) { + ServerWriter writer(param.call, param.server_context); + status = func_(service_, param.server_context, &req, &writer); + } + + CallOpSet ops; + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(param.server_context->initial_metadata_); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); + } + + private: + std::function*)> func_; + ServiceType* service_; +}; + +// A wrapper class of an application provided bidi-streaming handler. +template +class BidiStreamingHandler : public MethodHandler { + public: + BidiStreamingHandler( + std::function*)> + func, + ServiceType* service) + : func_(func), service_(service) {} + + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + ServerReaderWriter stream(param.call, + param.server_context); + Status status = func_(service_, param.server_context, &stream); + + CallOpSet ops; + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(param.server_context->initial_metadata_); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); + } + + private: + std::function*)> func_; + ServiceType* service_; +}; + +// Handle unknown method by returning UNIMPLEMENTED error. +class UnknownMethodHandler : public MethodHandler { + public: + template + static void FillOps(ServerContext* context, T* ops) { + Status status(StatusCode::UNIMPLEMENTED, ""); + if (!context->sent_initial_metadata_) { + ops->SendInitialMetadata(context->initial_metadata_); + context->sent_initial_metadata_ = true; + } + ops->ServerSendStatus(context->trailing_metadata_, status); + } + + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + CallOpSet ops; + FillOps(param.server_context, &ops); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); + } +}; + +} // namespace grpc + +#endif // GRPCXX_IMPL_METHOD_HANDLER_IMPL_H \ No newline at end of file diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h index b203c8f53a..c6451dcbc3 100644 --- a/include/grpc++/impl/rpc_service_method.h +++ b/include/grpc++/impl/rpc_service_method.h @@ -43,7 +43,6 @@ #include #include #include -#include namespace grpc { class ServerContext; @@ -71,197 +70,24 @@ class MethodHandler { virtual void RunHandler(const HandlerParameter& param) = 0; }; -// A wrapper class of an application provided rpc method handler. -template -class RpcMethodHandler : public MethodHandler { - public: - RpcMethodHandler( - std::function func, - ServiceType* service) - : func_(func), service_(service) {} - - void RunHandler(const HandlerParameter& param) GRPC_FINAL { - RequestType req; - Status status = SerializationTraits::Deserialize( - param.request, &req, param.max_message_size); - ResponseType rsp; - if (status.ok()) { - status = func_(service_, param.server_context, &req, &rsp); - } - - GPR_ASSERT(!param.server_context->sent_initial_metadata_); - CallOpSet ops; - ops.SendInitialMetadata(param.server_context->initial_metadata_); - if (status.ok()) { - status = ops.SendMessage(rsp); - } - ops.ServerSendStatus(param.server_context->trailing_metadata_, status); - param.call->PerformOps(&ops); - param.call->cq()->Pluck(&ops); - } - - private: - // Application provided rpc handler function. - std::function func_; - // The class the above handler function lives in. - ServiceType* service_; -}; - -// A wrapper class of an application provided client streaming handler. -template -class ClientStreamingHandler : public MethodHandler { - public: - ClientStreamingHandler( - std::function*, ResponseType*)> func, - ServiceType* service) - : func_(func), service_(service) {} - - void RunHandler(const HandlerParameter& param) GRPC_FINAL { - ServerReader reader(param.call, param.server_context); - ResponseType rsp; - Status status = func_(service_, param.server_context, &reader, &rsp); - - GPR_ASSERT(!param.server_context->sent_initial_metadata_); - CallOpSet ops; - ops.SendInitialMetadata(param.server_context->initial_metadata_); - if (status.ok()) { - status = ops.SendMessage(rsp); - } - ops.ServerSendStatus(param.server_context->trailing_metadata_, status); - param.call->PerformOps(&ops); - param.call->cq()->Pluck(&ops); - } - - private: - std::function*, - ResponseType*)> func_; - ServiceType* service_; -}; - -// A wrapper class of an application provided server streaming handler. -template -class ServerStreamingHandler : public MethodHandler { - public: - ServerStreamingHandler( - std::function*)> func, - ServiceType* service) - : func_(func), service_(service) {} - - void RunHandler(const HandlerParameter& param) GRPC_FINAL { - RequestType req; - Status status = SerializationTraits::Deserialize( - param.request, &req, param.max_message_size); - - if (status.ok()) { - ServerWriter writer(param.call, param.server_context); - status = func_(service_, param.server_context, &req, &writer); - } - - CallOpSet ops; - if (!param.server_context->sent_initial_metadata_) { - ops.SendInitialMetadata(param.server_context->initial_metadata_); - } - ops.ServerSendStatus(param.server_context->trailing_metadata_, status); - param.call->PerformOps(&ops); - param.call->cq()->Pluck(&ops); - } - - private: - std::function*)> func_; - ServiceType* service_; -}; - -// A wrapper class of an application provided bidi-streaming handler. -template -class BidiStreamingHandler : public MethodHandler { - public: - BidiStreamingHandler( - std::function*)> - func, - ServiceType* service) - : func_(func), service_(service) {} - - void RunHandler(const HandlerParameter& param) GRPC_FINAL { - ServerReaderWriter stream(param.call, - param.server_context); - Status status = func_(service_, param.server_context, &stream); - - CallOpSet ops; - if (!param.server_context->sent_initial_metadata_) { - ops.SendInitialMetadata(param.server_context->initial_metadata_); - } - ops.ServerSendStatus(param.server_context->trailing_metadata_, status); - param.call->PerformOps(&ops); - param.call->cq()->Pluck(&ops); - } - - private: - std::function*)> func_; - ServiceType* service_; -}; - -// Handle unknown method by returning UNIMPLEMENTED error. -class UnknownMethodHandler : public MethodHandler { - public: - template - static void FillOps(ServerContext* context, T* ops) { - Status status(StatusCode::UNIMPLEMENTED, ""); - if (!context->sent_initial_metadata_) { - ops->SendInitialMetadata(context->initial_metadata_); - context->sent_initial_metadata_ = true; - } - ops->ServerSendStatus(context->trailing_metadata_, status); - } - - void RunHandler(const HandlerParameter& param) GRPC_FINAL { - CallOpSet ops; - FillOps(param.server_context, &ops); - param.call->PerformOps(&ops); - param.call->cq()->Pluck(&ops); - } -}; - // Server side rpc method class class RpcServiceMethod : public RpcMethod { public: // Takes ownership of the handler RpcServiceMethod(const char* name, RpcMethod::RpcType type, MethodHandler* handler) - : RpcMethod(name, type), handler_(handler) {} + : RpcMethod(name, type), server_tag_(nullptr), handler_(handler) {} - MethodHandler* handler() { return handler_.get(); } + void set_server_tag(void* tag) { server_tag_ = tag; } + void* server_tag() const { return server_tag_; } + // if MethodHandler is nullptr, then this is an async method + MethodHandler* handler() const { return handler_.get(); } private: + void* server_tag_; std::unique_ptr handler_; }; -// This class contains all the method information for an rpc service. It is -// used for registering a service on a grpc server. -class RpcService { - public: - // Takes ownership. - void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); } - - RpcServiceMethod* GetMethod(int i) { return methods_[i].get(); } - int GetMethodCount() const { - // On win x64, int is only 32bit - GPR_ASSERT(methods_.size() <= INT_MAX); - return (int)methods_.size(); - } - - private: - std::vector> methods_; -}; - } // namespace grpc #endif // GRPCXX_IMPL_RPC_SERVICE_METHOD_H diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h index 3b6ac1de77..96b42c94fe 100644 --- a/include/grpc++/impl/service_type.h +++ b/include/grpc++/impl/service_type.h @@ -34,6 +34,7 @@ #ifndef GRPCXX_IMPL_SERVICE_TYPE_H #define GRPCXX_IMPL_SERVICE_TYPE_H +#include #include #include #include @@ -43,17 +44,10 @@ namespace grpc { class Call; class CompletionQueue; -class RpcService; class Server; class ServerCompletionQueue; class ServerContext; -class SynchronousService { - public: - virtual ~SynchronousService() {} - virtual RpcService* service() = 0; -}; - class ServerAsyncStreamingInterface { public: virtual ~ServerAsyncStreamingInterface() {} @@ -65,15 +59,27 @@ class ServerAsyncStreamingInterface { virtual void BindCall(Call* call) = 0; }; -class AsynchronousService { +class Service { public: - AsynchronousService(const char** method_names, size_t method_count) - : server_(nullptr), - method_names_(method_names), - method_count_(method_count), - request_args_(nullptr) {} + virtual ~Service() {} - ~AsynchronousService() { delete[] request_args_; } + bool has_async_methods() const { + for (auto it = methods_.begin(); it != methods_.end(); ++it) { + if ((*it)->handler() == nullptr) { + return true; + } + } + return false; + } + + bool has_synchronous_methods() const { + for (auto it = methods_.begin(); it != methods_.end(); ++it) { + if ((*it)->handler() != nullptr) { + return true; + } + } + return false; + } protected: template @@ -81,41 +87,41 @@ class AsynchronousService { ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - server_->RequestAsyncCall(request_args_[index], context, stream, call_cq, + server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag, request); } - void RequestClientStreaming(int index, ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) { - server_->RequestAsyncCall(request_args_[index], context, stream, call_cq, + void RequestAsyncClientStreaming(int index, ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { + server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag); } template - void RequestServerStreaming(int index, ServerContext* context, - Message* request, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) { - server_->RequestAsyncCall(request_args_[index], context, stream, call_cq, + void RequestAsyncServerStreaming(int index, ServerContext* context, + Message* request, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { + server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag, request); } - void RequestBidiStreaming(int index, ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag) { - server_->RequestAsyncCall(request_args_[index], context, stream, call_cq, + void RequestAsyncBidiStreaming(int index, ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { + server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag); } private: friend class Server; + Server* server_; - const char** const method_names_; - size_t method_count_; - void** request_args_; + std::vector> methods_; }; } // namespace grpc -- cgit v1.2.3