diff options
Diffstat (limited to 'src/cpp/server')
-rw-r--r-- | src/cpp/server/async_server.cc | 89 | ||||
-rw-r--r-- | src/cpp/server/async_server_context.cc | 101 | ||||
-rw-r--r-- | src/cpp/server/completion_queue.cc | 113 | ||||
-rw-r--r-- | src/cpp/server/rpc_service_method.h | 131 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 166 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 66 | ||||
-rw-r--r-- | src/cpp/server/server_rpc_handler.cc | 108 | ||||
-rw-r--r-- | src/cpp/server/server_rpc_handler.h | 66 | ||||
-rw-r--r-- | src/cpp/server/thread_pool.cc | 77 | ||||
-rw-r--r-- | src/cpp/server/thread_pool.h | 64 |
10 files changed, 981 insertions, 0 deletions
diff --git a/src/cpp/server/async_server.cc b/src/cpp/server/async_server.cc new file mode 100644 index 0000000000..aae2c82050 --- /dev/null +++ b/src/cpp/server/async_server.cc @@ -0,0 +1,89 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/async_server.h> + +#include <grpc/grpc.h> +#include <grpc/support/log.h> +#include <grpc++/completion_queue.h> + +namespace grpc { + +AsyncServer::AsyncServer(CompletionQueue* cc) + : started_(false), shutdown_(false) { + server_ = grpc_server_create(cc->cq(), nullptr); +} + +AsyncServer::~AsyncServer() { + std::unique_lock<std::mutex> lock(shutdown_mu_); + if (started_ && !shutdown_) { + lock.unlock(); + Shutdown(); + } + grpc_server_destroy(server_); +} + +void AsyncServer::AddPort(const grpc::string& addr) { + GPR_ASSERT(!started_); + int success = grpc_server_add_http2_port(server_, addr.c_str()); + GPR_ASSERT(success); +} + +void AsyncServer::Start() { + GPR_ASSERT(!started_); + started_ = true; + grpc_server_start(server_); +} + +void AsyncServer::RequestOneRpc() { + GPR_ASSERT(started_); + std::unique_lock<std::mutex> lock(shutdown_mu_); + if (shutdown_) { + return; + } + lock.unlock(); + grpc_call_error err = grpc_server_request_call(server_, nullptr); + GPR_ASSERT(err == GRPC_CALL_OK); +} + +void AsyncServer::Shutdown() { + std::unique_lock<std::mutex> lock(shutdown_mu_); + if (started_ && !shutdown_) { + shutdown_ = true; + lock.unlock(); + // TODO(yangg) should we shutdown without start? + grpc_server_shutdown(server_); + } +} + +} // namespace grpc diff --git a/src/cpp/server/async_server_context.cc b/src/cpp/server/async_server_context.cc new file mode 100644 index 0000000000..b231f4b0cf --- /dev/null +++ b/src/cpp/server/async_server_context.cc @@ -0,0 +1,101 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/async_server_context.h> + +#include <grpc/grpc.h> +#include <grpc/support/log.h> +#include "src/cpp/proto/proto_utils.h" +#include <google/protobuf/message.h> +#include <grpc++/status.h> + +namespace grpc { + +AsyncServerContext::AsyncServerContext( + grpc_call* call, const grpc::string& method, const grpc::string& host, + system_clock::time_point absolute_deadline) + : method_(method), + host_(host), + absolute_deadline_(absolute_deadline), + request_(nullptr), + call_(call) { +} + +AsyncServerContext::~AsyncServerContext() { grpc_call_destroy(call_); } + +void AsyncServerContext::Accept(grpc_completion_queue* cq) { + grpc_call_accept(call_, cq, this, 0); +} + +bool AsyncServerContext::StartRead(google::protobuf::Message* request) { + GPR_ASSERT(request); + request_ = request; + grpc_call_error err = grpc_call_start_read(call_, this); + return err == GRPC_CALL_OK; +} + +bool AsyncServerContext::StartWrite(const google::protobuf::Message& response, + int flags) { + grpc_byte_buffer* buffer = nullptr; + if (!SerializeProto(response, &buffer)) { + return false; + } + grpc_call_error err = grpc_call_start_write(call_, buffer, this, flags); + grpc_byte_buffer_destroy(buffer); + return err == GRPC_CALL_OK; +} + +namespace { +grpc_status TranslateStatus(const Status& status) { + grpc_status c_status; + // TODO(yangg) + c_status.code = GRPC_STATUS_OK; + c_status.details = nullptr; + return c_status; +} +} // namespace + +bool AsyncServerContext::StartWriteStatus(const Status& status) { + grpc_status c_status = TranslateStatus(status); + grpc_call_error err = grpc_call_start_write_status(call_, c_status, this); + return err == GRPC_CALL_OK; +} + +bool AsyncServerContext::ParseRead(grpc_byte_buffer* read_buffer) { + GPR_ASSERT(request_); + bool success = DeserializeProto(read_buffer, request_); + request_ = nullptr; + return success; +} + +} // namespace grpc diff --git a/src/cpp/server/completion_queue.cc b/src/cpp/server/completion_queue.cc new file mode 100644 index 0000000000..04eb301f7e --- /dev/null +++ b/src/cpp/server/completion_queue.cc @@ -0,0 +1,113 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +// TODO(yangg) maybe move to internal/common +#include <grpc++/completion_queue.h> + +#include <grpc/grpc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> +#include "src/cpp/util/time.h" +#include <grpc++/async_server_context.h> + +namespace grpc { + +CompletionQueue::CompletionQueue() { cq_ = grpc_completion_queue_create(); } + +CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); } + +void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); } + +CompletionQueue::CompletionType CompletionQueue::Next(void** tag) { + grpc_event* ev; + CompletionType return_type; + bool success; + + ev = grpc_completion_queue_next(cq_, gpr_inf_future); + if (!ev) { + gpr_log(GPR_ERROR, "no next event in queue"); + abort(); + } + switch (ev->type) { + case GRPC_QUEUE_SHUTDOWN: + return_type = QUEUE_CLOSED; + break; + case GRPC_READ: + *tag = ev->tag; + if (ev->data.read) { + success = + static_cast<AsyncServerContext*>(ev->tag)->ParseRead(ev->data.read); + return_type = success ? SERVER_READ_OK : SERVER_READ_ERROR; + } else { + return_type = SERVER_READ_ERROR; + } + break; + case GRPC_WRITE_ACCEPTED: + *tag = ev->tag; + if (ev->data.write_accepted != GRPC_OP_ERROR) { + return_type = SERVER_WRITE_OK; + } else { + return_type = SERVER_WRITE_ERROR; + } + break; + case GRPC_SERVER_RPC_NEW: + GPR_ASSERT(!ev->tag); + // Finishing the pending new rpcs after the server has been shutdown. + if (!ev->call) { + *tag = nullptr; + } else { + *tag = new AsyncServerContext(ev->call, ev->data.server_rpc_new.method, + ev->data.server_rpc_new.host, + AbsoluteDeadlineTimespec2Timepoint( + ev->data.server_rpc_new.deadline)); + } + return_type = SERVER_RPC_NEW; + break; + case GRPC_FINISHED: + *tag = ev->tag; + return_type = RPC_END; + break; + case GRPC_FINISH_ACCEPTED: + *tag = ev->tag; + return_type = HALFCLOSE_OK; + break; + default: + // We do not handle client side messages now + gpr_log(GPR_ERROR, "client-side messages aren't supported yet"); + abort(); + } + grpc_event_finish(ev); + return return_type; +} + +} // namespace grpc diff --git a/src/cpp/server/rpc_service_method.h b/src/cpp/server/rpc_service_method.h new file mode 100644 index 0000000000..ac2badda71 --- /dev/null +++ b/src/cpp/server/rpc_service_method.h @@ -0,0 +1,131 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPCPP_INTERNAL_SERVER_RPC_SERVICE_METHOD_H__ +#define __GRPCPP_INTERNAL_SERVER_RPC_SERVICE_METHOD_H__ + +#include <functional> +#include <map> +#include <memory> +#include <vector> + +#include "src/cpp/rpc_method.h" +#include <google/protobuf/message.h> +#include <grpc++/status.h> + +namespace grpc { + +// TODO(rocking): we might need to split this file into multiple ones. + +// Base class for running an RPC handler. +class MethodHandler { + public: + virtual ~MethodHandler() {} + struct HandlerParameter { + HandlerParameter(const google::protobuf::Message* req, google::protobuf::Message* resp) + : request(req), response(resp) {} + const google::protobuf::Message* request; + google::protobuf::Message* response; + }; + virtual ::grpc::Status RunHandler(const HandlerParameter& param) = 0; +}; + +// A wrapper class of an application provided rpc method handler. +template <class ServiceType, class RequestType, class ResponseType> +class RpcMethodHandler : public MethodHandler { + public: + RpcMethodHandler(std::function<::grpc::Status( + ServiceType*, const RequestType*, ResponseType*)> func, + ServiceType* service) + : func_(func), service_(service) {} + + ::grpc::Status RunHandler(const HandlerParameter& param) final { + // Invoke application function, cast proto messages to their actual types. + return func_(service_, dynamic_cast<const RequestType*>(param.request), + dynamic_cast<ResponseType*>(param.response)); + } + + private: + // Application provided rpc handler function. + std::function<::grpc::Status(ServiceType*, const RequestType*, ResponseType*)> + func_; + // The class the above handler function lives in. + ServiceType* service_; +}; + +// Server side rpc method class +class RpcServiceMethod : public RpcMethod { + public: + // Takes ownership of the handler and two prototype objects. + RpcServiceMethod(const char* name, MethodHandler* handler, + google::protobuf::Message* request_prototype, + google::protobuf::Message* response_prototype) + : RpcMethod(name), + handler_(handler), + request_prototype_(request_prototype), + response_prototype_(response_prototype) {} + + MethodHandler* handler() { return handler_.get(); } + + google::protobuf::Message* AllocateRequestProto() { return request_prototype_->New(); } + google::protobuf::Message* AllocateResponseProto() { + return response_prototype_->New(); + } + + private: + std::unique_ptr<MethodHandler> handler_; + std::unique_ptr<google::protobuf::Message> request_prototype_; + std::unique_ptr<google::protobuf::Message> response_prototype_; +}; + +// This class contains all the method information for an rpc service. It is +// used for registering a service on a grpc server. +class RpcService { + public: + // Takes ownership. + void AddMethod(RpcServiceMethod* method) { + methods_.push_back(std::unique_ptr<RpcServiceMethod>(method)); + } + + RpcServiceMethod* GetMethod(int i) { + return methods_[i].get(); + } + int GetMethodCount() const { return methods_.size(); } + + private: + std::vector<std::unique_ptr<RpcServiceMethod>> methods_; +}; + +} // namespace grpc + +#endif // __GRPCPP_INTERNAL_SERVER_RPC_SERVICE_METHOD_H__ diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc new file mode 100644 index 0000000000..9bf4073238 --- /dev/null +++ b/src/cpp/server/server.cc @@ -0,0 +1,166 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/server.h> +#include <utility> + +#include <grpc/grpc.h> +#include <grpc/support/log.h> +#include "src/cpp/server/rpc_service_method.h" +#include "src/cpp/server/server_rpc_handler.h" +#include "src/cpp/server/thread_pool.h" +#include <grpc++/async_server_context.h> +#include <grpc++/completion_queue.h> + +namespace grpc { + +// TODO(rocking): consider a better default value like num of cores. +static const int kNumThreads = 4; + +Server::Server(ThreadPoolInterface* thread_pool) + : started_(false), + shutdown_(false), + num_running_cb_(0), + thread_pool_(thread_pool == nullptr ? new ThreadPool(kNumThreads) + : thread_pool), + thread_pool_owned_(thread_pool == nullptr) { + server_ = grpc_server_create(cq_.cq(), nullptr); +} + +Server::Server() { + // Should not be called. + GPR_ASSERT(false); +} + +Server::~Server() { + std::unique_lock<std::mutex> lock(mu_); + if (started_ && !shutdown_) { + lock.unlock(); + Shutdown(); + } + grpc_server_destroy(server_); + if (thread_pool_owned_) { + delete thread_pool_; + } +} + +void Server::RegisterService(RpcService* service) { + for (int i = 0; i < service->GetMethodCount(); ++i) { + RpcServiceMethod* method = service->GetMethod(i); + method_map_.insert(std::make_pair(method->name(), method)); + } +} + +void Server::AddPort(const grpc::string& addr) { + GPR_ASSERT(!started_); + int success = grpc_server_add_http2_port(server_, addr.c_str()); + GPR_ASSERT(success); +} + +void Server::Start() { + GPR_ASSERT(!started_); + started_ = true; + grpc_server_start(server_); + + // Start processing rpcs. + ScheduleCallback(); +} + +void Server::AllowOneRpc() { + GPR_ASSERT(started_); + grpc_call_error err = grpc_server_request_call(server_, nullptr); + GPR_ASSERT(err == GRPC_CALL_OK); +} + +void Server::Shutdown() { + { + std::unique_lock<std::mutex> lock(mu_); + if (started_ && !shutdown_) { + shutdown_ = true; + grpc_server_shutdown(server_); + + // Wait for running callbacks to finish. + while (num_running_cb_ != 0) { + callback_cv_.wait(lock); + } + } + } + + // Shutdown the completion queue. + cq_.Shutdown(); + void* tag = nullptr; + CompletionQueue::CompletionType t = cq_.Next(&tag); + GPR_ASSERT(t == CompletionQueue::QUEUE_CLOSED); +} + +void Server::ScheduleCallback() { + { + std::unique_lock<std::mutex> lock(mu_); + num_running_cb_++; + } + std::function<void()> callback = std::bind(&Server::RunRpc, this); + thread_pool_->ScheduleCallback(callback); +} + +void Server::RunRpc() { + // Wait for one more incoming rpc. + void* tag = nullptr; + AllowOneRpc(); + CompletionQueue::CompletionType t = cq_.Next(&tag); + GPR_ASSERT(t == CompletionQueue::SERVER_RPC_NEW); + + AsyncServerContext* server_context = static_cast<AsyncServerContext*>(tag); + // server_context could be nullptr during server shutdown. + if (server_context != nullptr) { + // Schedule a new callback to handle more rpcs. + ScheduleCallback(); + + RpcServiceMethod* method = nullptr; + auto iter = method_map_.find(server_context->method()); + if (iter != method_map_.end()) { + method = iter->second; + } + ServerRpcHandler rpc_handler(server_context, method); + rpc_handler.StartRpc(); + } + + { + std::unique_lock<std::mutex> lock(mu_); + num_running_cb_--; + if (shutdown_) { + callback_cv_.notify_all(); + } + } +} + +} // namespace grpc diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc new file mode 100644 index 0000000000..d5d0689bc5 --- /dev/null +++ b/src/cpp/server/server_builder.cc @@ -0,0 +1,66 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/server_builder.h> + +#include <grpc++/server.h> + +namespace grpc { + +ServerBuilder::ServerBuilder() : thread_pool_(nullptr) {} + +void ServerBuilder::RegisterService(RpcService* service) { + services_.push_back(service); +} + +void ServerBuilder::AddPort(const grpc::string& addr) { + ports_.push_back(addr); +} + +void ServerBuilder::SetThreadPool(ThreadPoolInterface* thread_pool) { + thread_pool_ = thread_pool; +} + +std::unique_ptr<Server> ServerBuilder::BuildAndStart() { + std::unique_ptr<Server> server(new Server(thread_pool_)); + for (auto* service : services_) { + server->RegisterService(service); + } + for (auto& port : ports_) { + server->AddPort(port); + } + server->Start(); + return server; +} + +} // namespace grpc diff --git a/src/cpp/server/server_rpc_handler.cc b/src/cpp/server/server_rpc_handler.cc new file mode 100644 index 0000000000..2d5a081deb --- /dev/null +++ b/src/cpp/server/server_rpc_handler.cc @@ -0,0 +1,108 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/cpp/server/server_rpc_handler.h" + +#include <grpc/support/log.h> +#include "src/cpp/server/rpc_service_method.h" +#include <grpc++/async_server_context.h> + +namespace grpc { + +ServerRpcHandler::ServerRpcHandler(AsyncServerContext* server_context, + RpcServiceMethod* method) + : server_context_(server_context), + method_(method) { +} + +void ServerRpcHandler::StartRpc() { + // Start the rpc on this dedicated completion queue. + server_context_->Accept(cq_.cq()); + + if (method_ == nullptr) { + // Method not supported, finish the rpc with error. + // TODO(rocking): do we need to call read to consume the request? + FinishRpc(Status(StatusCode::UNIMPLEMENTED, "No such method.")); + return; + } + + // Allocate request and response. + std::unique_ptr<google::protobuf::Message> request(method_->AllocateRequestProto()); + std::unique_ptr<google::protobuf::Message> response(method_->AllocateResponseProto()); + + // Read request + server_context_->StartRead(request.get()); + auto type = WaitForNextEvent(); + GPR_ASSERT(type == CompletionQueue::SERVER_READ_OK); + + // Run the application's rpc handler + MethodHandler* handler = method_->handler(); + Status status = handler->RunHandler( + MethodHandler::HandlerParameter(request.get(), response.get())); + + if (status.IsOk()) { + // Send the response if we get an ok status. + server_context_->StartWrite(*response, 0); + type = WaitForNextEvent(); + if (type != CompletionQueue::SERVER_WRITE_OK) { + status = Status(StatusCode::INTERNAL, "Error writing response."); + } + } + + FinishRpc(status); +} + +CompletionQueue::CompletionType ServerRpcHandler::WaitForNextEvent() { + void* tag = nullptr; + CompletionQueue::CompletionType type = cq_.Next(&tag); + if (type != CompletionQueue::QUEUE_CLOSED && + type != CompletionQueue::RPC_END) { + GPR_ASSERT(static_cast<AsyncServerContext*>(tag) == server_context_.get()); + } + return type; +} + +void ServerRpcHandler::FinishRpc(const Status& status) { + server_context_->StartWriteStatus(status); + CompletionQueue::CompletionType type = WaitForNextEvent(); + // TODO(rocking): do we care about this return type? + + type = WaitForNextEvent(); + GPR_ASSERT(type == CompletionQueue::RPC_END); + + cq_.Shutdown(); + type = WaitForNextEvent(); + GPR_ASSERT(type == CompletionQueue::QUEUE_CLOSED); +} + +} // namespace grpc diff --git a/src/cpp/server/server_rpc_handler.h b/src/cpp/server/server_rpc_handler.h new file mode 100644 index 0000000000..ca48fd74dd --- /dev/null +++ b/src/cpp/server/server_rpc_handler.h @@ -0,0 +1,66 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPCPP_INTERNAL_SERVER_SERVER_RPC_HANDLER_H__ +#define __GRPCPP_INTERNAL_SERVER_SERVER_RPC_HANDLER_H__ + +#include <memory> + +#include <grpc++/completion_queue.h> +#include <grpc++/status.h> + +namespace grpc { + +class AsyncServerContext; +class RpcServiceMethod; + +class ServerRpcHandler { + public: + // Takes ownership of server_context. + ServerRpcHandler(AsyncServerContext* server_context, + RpcServiceMethod* method); + + void StartRpc(); + + private: + CompletionQueue::CompletionType WaitForNextEvent(); + void FinishRpc(const Status& status); + + std::unique_ptr<AsyncServerContext> server_context_; + RpcServiceMethod* method_; + CompletionQueue cq_; +}; + +} // namespace grpc + +#endif // __GRPCPP_INTERNAL_SERVER_SERVER_RPC_HANDLER_H__ diff --git a/src/cpp/server/thread_pool.cc b/src/cpp/server/thread_pool.cc new file mode 100644 index 0000000000..ce364c4795 --- /dev/null +++ b/src/cpp/server/thread_pool.cc @@ -0,0 +1,77 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/cpp/server/thread_pool.h" + +namespace grpc { + +ThreadPool::ThreadPool(int num_threads) { + for (int i = 0; i < num_threads; i++) { + threads_.push_back(std::thread([=]() { + for (;;) { + std::unique_lock<std::mutex> lock(mu_); + // Wait until work is available or we are shutting down. + cv_.wait(lock, [=]() { return shutdown_ || !callbacks_.empty(); }); + // Drain callbacks before considering shutdown to ensure all work + // gets completed. + if (!callbacks_.empty()) { + auto cb = callbacks_.front(); + callbacks_.pop(); + lock.unlock(); + cb(); + } else if (shutdown_) { + return; + } + } + })); + } +} + +ThreadPool::~ThreadPool() { + { + std::lock_guard<std::mutex> lock(mu_); + shutdown_ = true; + cv_.notify_all(); + } + for (auto& t : threads_) { + t.join(); + } +} + +void ThreadPool::ScheduleCallback(const std::function<void()>& callback) { + std::lock_guard<std::mutex> lock(mu_); + callbacks_.push(callback); + cv_.notify_all(); +} + +} // namespace grpc diff --git a/src/cpp/server/thread_pool.h b/src/cpp/server/thread_pool.h new file mode 100644 index 0000000000..6fc71d6695 --- /dev/null +++ b/src/cpp/server/thread_pool.h @@ -0,0 +1,64 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPCPP_INTERNAL_SERVER_THREAD_POOL_H__ +#define __GRPCPP_INTERNAL_SERVER_THREAD_POOL_H__ + +#include <grpc++/thread_pool_interface.h> + +#include <condition_variable> +#include <thread> +#include <mutex> +#include <queue> +#include <vector> + +namespace grpc { + +class ThreadPool : public ThreadPoolInterface { + public: + explicit ThreadPool(int num_threads); + ~ThreadPool(); + + void ScheduleCallback(const std::function<void()>& callback) final; + + private: + std::mutex mu_; + std::condition_variable cv_; + bool shutdown_ = false; + std::queue<std::function<void()>> callbacks_; + std::vector<std::thread> threads_; +}; + +} // namespace grpc + +#endif // __GRPCPP_INTERNAL_SERVER_THREAD_POOL_H__ |