diff options
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/server/rpc_service_method.h | 59 | ||||
-rw-r--r-- | src/cpp/server/server_context_impl.cc | 40 | ||||
-rw-r--r-- | src/cpp/server/server_context_impl.h | 49 | ||||
-rw-r--r-- | src/cpp/server/server_rpc_handler.cc | 28 | ||||
-rw-r--r-- | src/cpp/server/server_rpc_handler.h | 6 |
5 files changed, 144 insertions, 38 deletions
diff --git a/src/cpp/server/rpc_service_method.h b/src/cpp/server/rpc_service_method.h index 425545fd22..f4fe01c06b 100644 --- a/src/cpp/server/rpc_service_method.h +++ b/src/cpp/server/rpc_service_method.h @@ -45,6 +45,7 @@ #include <grpc++/stream.h> namespace grpc { +class ServerContext; class StreamContextInterface; // TODO(rocking): we might need to split this file into multiple ones. @@ -54,11 +55,19 @@ class MethodHandler { public: virtual ~MethodHandler() {} struct HandlerParameter { - HandlerParameter(const google::protobuf::Message* req, google::protobuf::Message* resp) - : request(req), response(resp), stream_context(nullptr) {} - HandlerParameter(const google::protobuf::Message* req, google::protobuf::Message* resp, - StreamContextInterface* context) - : request(req), response(resp), stream_context(context) {} + HandlerParameter(ServerContext* context, const google::protobuf::Message* req, + google::protobuf::Message* resp) + : server_context(context), + request(req), + response(resp), + stream_context(nullptr) {} + HandlerParameter(ServerContext* context, const google::protobuf::Message* req, + google::protobuf::Message* resp, StreamContextInterface* stream) + : server_context(context), + request(req), + response(resp), + stream_context(stream) {} + ServerContext* server_context; const google::protobuf::Message* request; google::protobuf::Message* response; StreamContextInterface* stream_context; @@ -70,20 +79,23 @@ class MethodHandler { template <class ServiceType, class RequestType, class ResponseType> class RpcMethodHandler : public MethodHandler { public: - RpcMethodHandler(std::function<Status(ServiceType*, const RequestType*, - ResponseType*)> func, - ServiceType* service) + RpcMethodHandler( + std::function<Status(ServiceType*, ServerContext*, const RequestType*, + ResponseType*)> func, + ServiceType* service) : func_(func), service_(service) {} Status RunHandler(const HandlerParameter& param) final { // Invoke application function, cast proto messages to their actual types. - return func_(service_, dynamic_cast<const RequestType*>(param.request), + return func_(service_, param.server_context, + dynamic_cast<const RequestType*>(param.request), dynamic_cast<ResponseType*>(param.response)); } private: // Application provided rpc handler function. - std::function<Status(ServiceType*, const RequestType*, ResponseType*)> func_; + std::function<Status(ServiceType*, ServerContext*, const RequestType*, + ResponseType*)> func_; // The class the above handler function lives in. ServiceType* service_; }; @@ -93,20 +105,20 @@ template <class ServiceType, class RequestType, class ResponseType> class ClientStreamingHandler : public MethodHandler { public: ClientStreamingHandler( - std::function<Status(ServiceType*, ServerReader<RequestType>*, - ResponseType*)> func, + std::function<Status(ServiceType*, ServerContext*, + ServerReader<RequestType>*, ResponseType*)> func, ServiceType* service) : func_(func), service_(service) {} Status RunHandler(const HandlerParameter& param) final { ServerReader<RequestType> reader(param.stream_context); - return func_(service_, &reader, + return func_(service_, param.server_context, &reader, dynamic_cast<ResponseType*>(param.response)); } private: - std::function<Status(ServiceType*, ServerReader<RequestType>*, ResponseType*)> - func_; + std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*, + ResponseType*)> func_; ServiceType* service_; }; @@ -115,19 +127,19 @@ template <class ServiceType, class RequestType, class ResponseType> class ServerStreamingHandler : public MethodHandler { public: ServerStreamingHandler( - std::function<Status(ServiceType*, const RequestType*, + std::function<Status(ServiceType*, ServerContext*, const RequestType*, ServerWriter<ResponseType>*)> func, ServiceType* service) : func_(func), service_(service) {} Status RunHandler(const HandlerParameter& param) final { ServerWriter<ResponseType> writer(param.stream_context); - return func_(service_, dynamic_cast<const RequestType*>(param.request), - &writer); + return func_(service_, param.server_context, + dynamic_cast<const RequestType*>(param.request), &writer); } private: - std::function<Status(ServiceType*, const RequestType*, + std::function<Status(ServiceType*, ServerContext*, const RequestType*, ServerWriter<ResponseType>*)> func_; ServiceType* service_; }; @@ -137,18 +149,19 @@ template <class ServiceType, class RequestType, class ResponseType> class BidiStreamingHandler : public MethodHandler { public: BidiStreamingHandler( - std::function<Status( - ServiceType*, ServerReaderWriter<ResponseType, RequestType>*)> func, + std::function<Status(ServiceType*, ServerContext*, + ServerReaderWriter<ResponseType, RequestType>*)> + func, ServiceType* service) : func_(func), service_(service) {} Status RunHandler(const HandlerParameter& param) final { ServerReaderWriter<ResponseType, RequestType> stream(param.stream_context); - return func_(service_, &stream); + return func_(service_, param.server_context, &stream); } private: - std::function<Status(ServiceType*, + std::function<Status(ServiceType*, ServerContext*, ServerReaderWriter<ResponseType, RequestType>*)> func_; ServiceType* service_; }; diff --git a/src/cpp/server/server_context_impl.cc b/src/cpp/server/server_context_impl.cc new file mode 100644 index 0000000000..a229fcb6c9 --- /dev/null +++ b/src/cpp/server/server_context_impl.cc @@ -0,0 +1,40 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/cpp/server/server_context_impl.h" + +namespace grpc { + +ServerContextImpl::ServerContextImpl() {} + +} // namespace grpc diff --git a/src/cpp/server/server_context_impl.h b/src/cpp/server/server_context_impl.h new file mode 100644 index 0000000000..58c4293b4f --- /dev/null +++ b/src/cpp/server/server_context_impl.h @@ -0,0 +1,49 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPCPP_INTERNAL_SERVER_SERVER_CONTEXT_IMPL_H_ +#define __GRPCPP_INTERNAL_SERVER_SERVER_CONTEXT_IMPL_H_ + +#include <grpc++/server_context.h> + +namespace grpc { + +class ServerContextImpl : public ServerContext { + public: + ServerContextImpl(); + ~ServerContextImpl() {} +}; + +} // namespace grpc + +#endif // __GRPCPP_INTERNAL_SERVER_SERVER_CONTEXT_IMPL_H_ diff --git a/src/cpp/server/server_rpc_handler.cc b/src/cpp/server/server_rpc_handler.cc index 4c8d0cd04e..c32722f81a 100644 --- a/src/cpp/server/server_rpc_handler.cc +++ b/src/cpp/server/server_rpc_handler.cc @@ -35,14 +35,15 @@ #include <grpc/support/log.h> #include "src/cpp/server/rpc_service_method.h" +#include "src/cpp/server/server_context_impl.h" #include "src/cpp/stream/stream_context.h" #include <grpc++/async_server_context.h> namespace grpc { -ServerRpcHandler::ServerRpcHandler(AsyncServerContext* server_context, +ServerRpcHandler::ServerRpcHandler(AsyncServerContext* async_server_context, RpcServiceMethod* method) - : server_context_(server_context), method_(method) {} + : async_server_context_(async_server_context), method_(method) {} void ServerRpcHandler::StartRpc() { if (method_ == nullptr) { @@ -52,27 +53,29 @@ void ServerRpcHandler::StartRpc() { return; } + ServerContextImpl user_context; + if (method_->method_type() == RpcMethod::NORMAL_RPC) { // Start the rpc on this dedicated completion queue. - server_context_->Accept(cq_.cq()); + async_server_context_->Accept(cq_.cq()); // Allocate request and response. std::unique_ptr<google::protobuf::Message> request(method_->AllocateRequestProto()); std::unique_ptr<google::protobuf::Message> response(method_->AllocateResponseProto()); // Read request - server_context_->StartRead(request.get()); + async_server_context_->StartRead(request.get()); auto type = WaitForNextEvent(); GPR_ASSERT(type == CompletionQueue::SERVER_READ_OK); // Run the application's rpc handler MethodHandler* handler = method_->handler(); - Status status = handler->RunHandler( - MethodHandler::HandlerParameter(request.get(), response.get())); + Status status = handler->RunHandler(MethodHandler::HandlerParameter( + &user_context, request.get(), response.get())); if (status.IsOk()) { // Send the response if we get an ok status. - server_context_->StartWrite(*response, 0); + async_server_context_->StartWrite(*response, 0); type = WaitForNextEvent(); if (type != CompletionQueue::SERVER_WRITE_OK) { status = Status(StatusCode::INTERNAL, "Error writing response."); @@ -86,13 +89,13 @@ void ServerRpcHandler::StartRpc() { std::unique_ptr<google::protobuf::Message> request(method_->AllocateRequestProto()); std::unique_ptr<google::protobuf::Message> response(method_->AllocateResponseProto()); - StreamContext stream_context(*method_, server_context_->call(), cq_.cq(), - request.get(), response.get()); + StreamContext stream_context(*method_, async_server_context_->call(), + cq_.cq(), request.get(), response.get()); // Run the application's rpc handler MethodHandler* handler = method_->handler(); Status status = handler->RunHandler(MethodHandler::HandlerParameter( - request.get(), response.get(), &stream_context)); + &user_context, request.get(), response.get(), &stream_context)); if (status.IsOk() && method_->method_type() == RpcMethod::CLIENT_STREAMING) { stream_context.Write(response.get(), false); @@ -107,13 +110,14 @@ CompletionQueue::CompletionType ServerRpcHandler::WaitForNextEvent() { CompletionQueue::CompletionType type = cq_.Next(&tag); if (type != CompletionQueue::QUEUE_CLOSED && type != CompletionQueue::RPC_END) { - GPR_ASSERT(static_cast<AsyncServerContext*>(tag) == server_context_.get()); + GPR_ASSERT(static_cast<AsyncServerContext*>(tag) == + async_server_context_.get()); } return type; } void ServerRpcHandler::FinishRpc(const Status& status) { - server_context_->StartWriteStatus(status); + async_server_context_->StartWriteStatus(status); CompletionQueue::CompletionType type; // HALFCLOSE_OK and RPC_END events come in either order. diff --git a/src/cpp/server/server_rpc_handler.h b/src/cpp/server/server_rpc_handler.h index ca48fd74dd..249576d504 100644 --- a/src/cpp/server/server_rpc_handler.h +++ b/src/cpp/server/server_rpc_handler.h @@ -46,8 +46,8 @@ class RpcServiceMethod; class ServerRpcHandler { public: - // Takes ownership of server_context. - ServerRpcHandler(AsyncServerContext* server_context, + // Takes ownership of async_server_context. + ServerRpcHandler(AsyncServerContext* async_server_context, RpcServiceMethod* method); void StartRpc(); @@ -56,7 +56,7 @@ class ServerRpcHandler { CompletionQueue::CompletionType WaitForNextEvent(); void FinishRpc(const Status& status); - std::unique_ptr<AsyncServerContext> server_context_; + std::unique_ptr<AsyncServerContext> async_server_context_; RpcServiceMethod* method_; CompletionQueue cq_; }; |