aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--Makefile6
-rw-r--r--build.json10
-rw-r--r--include/grpc++/completion_queue.h3
-rw-r--r--include/grpc++/impl/call.h1
-rw-r--r--include/grpc++/server.h19
-rw-r--r--include/grpc++/server_context.h4
-rw-r--r--src/cpp/server/server.cc125
-rw-r--r--src/cpp/server/server_context.cc (renamed from src/cpp/server/server_context_impl.cc)2
-rw-r--r--src/cpp/server/server_context_impl.h61
9 files changed, 133 insertions, 98 deletions
diff --git a/Makefile b/Makefile
index 622181b15b..ea0ce66fbe 100644
--- a/Makefile
+++ b/Makefile
@@ -2645,7 +2645,7 @@ LIBGRPC++_SRC = \
src/cpp/proto/proto_utils.cc \
src/cpp/server/server.cc \
src/cpp/server/server_builder.cc \
- src/cpp/server/server_context_impl.cc \
+ src/cpp/server/server_context.cc \
src/cpp/server/server_credentials.cc \
src/cpp/server/thread_pool.cc \
src/cpp/util/status.cc \
@@ -2702,7 +2702,7 @@ src/cpp/common/rpc_method.cc: $(OPENSSL_DEP)
src/cpp/proto/proto_utils.cc: $(OPENSSL_DEP)
src/cpp/server/server.cc: $(OPENSSL_DEP)
src/cpp/server/server_builder.cc: $(OPENSSL_DEP)
-src/cpp/server/server_context_impl.cc: $(OPENSSL_DEP)
+src/cpp/server/server_context.cc: $(OPENSSL_DEP)
src/cpp/server/server_credentials.cc: $(OPENSSL_DEP)
src/cpp/server/thread_pool.cc: $(OPENSSL_DEP)
src/cpp/util/status.cc: $(OPENSSL_DEP)
@@ -2760,7 +2760,7 @@ objs/$(CONFIG)/src/cpp/common/rpc_method.o:
objs/$(CONFIG)/src/cpp/proto/proto_utils.o:
objs/$(CONFIG)/src/cpp/server/server.o:
objs/$(CONFIG)/src/cpp/server/server_builder.o:
-objs/$(CONFIG)/src/cpp/server/server_context_impl.o:
+objs/$(CONFIG)/src/cpp/server/server_context.o:
objs/$(CONFIG)/src/cpp/server/server_credentials.o:
objs/$(CONFIG)/src/cpp/server/thread_pool.o:
objs/$(CONFIG)/src/cpp/util/status.o:
diff --git a/build.json b/build.json
index e6993acd6e..77a737031a 100644
--- a/build.json
+++ b/build.json
@@ -428,7 +428,7 @@
"src/cpp/proto/proto_utils.cc",
"src/cpp/server/server.cc",
"src/cpp/server/server_builder.cc",
- "src/cpp/server/server_context_impl.cc",
+ "src/cpp/server/server_context.cc",
"src/cpp/server/server_credentials.cc",
"src/cpp/server/thread_pool.cc",
"src/cpp/util/status.cc",
@@ -1621,7 +1621,6 @@
{
"name": "qps_client",
"build": "test",
- "run": false,
"language": "c++",
"src": [
"test/cpp/qps/qpstest.proto",
@@ -1634,12 +1633,12 @@
"grpc",
"gpr_test_util",
"gpr"
- ]
+ ],
+ "run": false
},
{
"name": "qps_server",
"build": "test",
- "run": false,
"language": "c++",
"src": [
"test/cpp/qps/qpstest.proto",
@@ -1652,7 +1651,8 @@
"grpc",
"gpr_test_util",
"gpr"
- ]
+ ],
+ "run": false
},
{
"name": "ruby_plugin",
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index c976bd5b45..7f0677b4e5 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -54,6 +54,7 @@ template <class R, class W>
class ServerReaderWriter;
class CompletionQueue;
+class Server;
class CompletionQueueTag {
public:
@@ -67,6 +68,7 @@ class CompletionQueueTag {
class CompletionQueue {
public:
CompletionQueue();
+ explicit CompletionQueue(grpc_completion_queue *take);
~CompletionQueue();
// Blocking read from queue.
@@ -87,6 +89,7 @@ class CompletionQueue {
template <class R> friend class ::grpc::ServerReader;
template <class W> friend class ::grpc::ServerWriter;
template <class R, class W> friend class ::grpc::ServerReaderWriter;
+ friend class ::grpc::Server;
friend Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,
const google::protobuf::Message &request,
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h
index edc6555b0c..11e193eec1 100644
--- a/include/grpc++/impl/call.h
+++ b/include/grpc++/impl/call.h
@@ -67,6 +67,7 @@ class CallOpBuffer final : public CompletionQueueTag {
void AddRecvMessage(google::protobuf::Message *message);
void AddClientSendClose();
void AddClientRecvStatus(Status *status);
+ void AddServerSendStatus(std::multimap<grpc::string, grpc::string> *metadata, const Status& status);
// INTERNAL API:
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index eefd4457f9..b02c4130d9 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -69,24 +69,7 @@ class Server {
private:
friend class ServerBuilder;
- class MethodRequestData {
- public:
- MethodRequestData(RpcServiceMethod* method, void* tag) : method_(method), tag_(tag) {}
- static MethodRequestData *Wait(CompletionQueue *cq);
-
- void Request(CompletionQueue* cq);
-
- class CallData {
- public:
- explicit CallData(MethodRequestData *mrd);
-
- void Run();
- };
-
- private:
- RpcServiceMethod *const method_;
- void *const tag_;
- };
+ class MethodRequestData;
// ServerBuilder use only
Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, ServerCredentials* creds);
diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h
index 4af9fd6aaa..a58e63aff2 100644
--- a/include/grpc++/server_context.h
+++ b/include/grpc++/server_context.h
@@ -39,11 +39,15 @@
#include "config.h"
+struct grpc_metadata;
+struct gpr_timespec;
+
namespace grpc {
// Interface of server side rpc context.
class ServerContext {
public:
+ ServerContext(gpr_timespec deadline, grpc_metadata *metadata, size_t metadata_count);
virtual ~ServerContext() {}
std::chrono::system_clock::time_point absolute_deadline();
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index f5bbfdc6f7..02fb383394 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -43,9 +43,12 @@
#include <grpc++/server_credentials.h>
#include <grpc++/thread_pool_interface.h>
+#include "src/cpp/proto/proto_utils.h"
+
namespace grpc {
-Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned, ServerCredentials *creds)
+Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned,
+ ServerCredentials *creds)
: started_(false),
shutdown_(false),
num_running_cb_(0),
@@ -53,8 +56,7 @@ Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned, ServerC
thread_pool_owned_(thread_pool_owned),
secure_(creds != nullptr) {
if (creds) {
- server_ =
- grpc_secure_server_create(creds->GetRawCreds(), nullptr, nullptr);
+ server_ = grpc_secure_server_create(creds->GetRawCreds(), nullptr, nullptr);
} else {
server_ = grpc_server_create(nullptr, nullptr);
}
@@ -87,7 +89,8 @@ bool Server::RegisterService(RpcService *service) {
RpcServiceMethod *method = service->GetMethod(i);
void *tag = grpc_server_register_method(server_, method->name(), nullptr);
if (!tag) {
- gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name());
+ gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
+ method->name());
return false;
}
methods_.emplace_back(method, tag);
@@ -104,6 +107,105 @@ int Server::AddPort(const grpc::string &addr) {
}
}
+class Server::MethodRequestData final : public CompletionQueueTag {
+ public:
+ MethodRequestData(RpcServiceMethod *method, void *tag)
+ : method_(method),
+ tag_(tag),
+ has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
+ method->method_type() ==
+ RpcMethod::SERVER_STREAMING),
+ has_response_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
+ method->method_type() ==
+ RpcMethod::CLIENT_STREAMING) {
+ grpc_metadata_array_init(&request_metadata_);
+ }
+
+ static MethodRequestData *Wait(CompletionQueue *cq, bool *ok) {
+ void *tag;
+ if (!cq->Next(&tag, ok)) {
+ return nullptr;
+ }
+ auto *mrd = static_cast<MethodRequestData *>(tag);
+ GPR_ASSERT(mrd->in_flight_);
+ return mrd;
+ }
+
+ void Request(grpc_server *server, CompletionQueue *cq) {
+ GPR_ASSERT(!in_flight_);
+ in_flight_ = true;
+ cq_ = grpc_completion_queue_create();
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_registered_call(
+ server, tag_, &call_, &deadline_, &request_metadata_,
+ has_request_payload_ ? &request_payload_ : nullptr, cq->cq(),
+ cq_, this));
+ }
+
+ void FinalizeResult(void *tag, bool *status) override {}
+
+ class CallData {
+ public:
+ explicit CallData(MethodRequestData *mrd)
+ : cq_(mrd->cq_),
+ call_(mrd->call_, nullptr, &cq_),
+ ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
+ mrd->request_metadata_.count),
+ has_request_payload_(mrd->has_request_payload_),
+ has_response_payload_(mrd->has_response_payload_),
+ request_payload_(mrd->request_payload_),
+ method_(mrd->method_) {
+ GPR_ASSERT(mrd->in_flight_);
+ mrd->in_flight_ = false;
+ mrd->request_metadata_.count = 0;
+ }
+
+ void Run() {
+ std::unique_ptr<google::protobuf::Message> req;
+ std::unique_ptr<google::protobuf::Message> res;
+ if (has_request_payload_) {
+ req.reset(method_->AllocateRequestProto());
+ if (!DeserializeProto(request_payload_, req.get())) {
+ abort(); // for now
+ }
+ }
+ if (has_response_payload_) {
+ req.reset(method_->AllocateResponseProto());
+ }
+ auto status = method_->handler()->RunHandler(
+ MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get()));
+ CallOpBuffer buf;
+ buf.AddServerSendStatus(nullptr, status);
+ if (has_response_payload_) {
+ buf.AddSendMessage(*res);
+ }
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
+ }
+
+ private:
+ CompletionQueue cq_;
+ Call call_;
+ ServerContext ctx_;
+ const bool has_request_payload_;
+ const bool has_response_payload_;
+ grpc_byte_buffer *request_payload_;
+ RpcServiceMethod *const method_;
+ };
+
+ private:
+ RpcServiceMethod *const method_;
+ void *const tag_;
+ bool in_flight_ = false;
+ const bool has_request_payload_;
+ const bool has_response_payload_;
+ grpc_call *call_;
+ gpr_timespec deadline_;
+ grpc_metadata_array request_metadata_;
+ grpc_byte_buffer *request_payload_;
+ grpc_completion_queue *cq_;
+};
+
bool Server::Start() {
GPR_ASSERT(!started_);
started_ = true;
@@ -111,8 +213,8 @@ bool Server::Start() {
// Start processing rpcs.
if (cq_sync_) {
- for (auto& m : methods_) {
- m.Request(cq_sync_.get());
+ for (auto &m : methods_) {
+ m.Request(server_, cq_sync_.get());
}
ScheduleCallback();
@@ -146,14 +248,17 @@ void Server::ScheduleCallback() {
void Server::RunRpc() {
// Wait for one more incoming rpc.
- auto* mrd = MethodRequestData::Wait(cq_sync_.get());
+ bool ok;
+ auto *mrd = MethodRequestData::Wait(cq_sync_.get(), &ok);
if (mrd) {
MethodRequestData::CallData cd(mrd);
- mrd->Request(cq_sync_.get());
- ScheduleCallback();
+ if (ok) {
+ mrd->Request(server_, cq_sync_.get());
+ ScheduleCallback();
- cd.Run();
+ cd.Run();
+ }
}
{
diff --git a/src/cpp/server/server_context_impl.cc b/src/cpp/server/server_context.cc
index 467cc80e05..0edadd8709 100644
--- a/src/cpp/server/server_context_impl.cc
+++ b/src/cpp/server/server_context.cc
@@ -31,6 +31,6 @@
*
*/
-#include "src/cpp/server/server_context_impl.h"
+#include <grpc++/server_context.h>
namespace grpc {} // namespace grpc
diff --git a/src/cpp/server/server_context_impl.h b/src/cpp/server/server_context_impl.h
deleted file mode 100644
index c6016b7635..0000000000
--- a/src/cpp/server/server_context_impl.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef __GRPCPP_INTERNAL_SERVER_SERVER_CONTEXT_IMPL_H_
-#define __GRPCPP_INTERNAL_SERVER_SERVER_CONTEXT_IMPL_H_
-
-#include <grpc++/server_context.h>
-
-#include <chrono>
-
-#include <grpc/support/time.h>
-
-namespace grpc {
-
-class ServerContextImpl : public ServerContext {
- public:
- explicit ServerContextImpl(std::chrono::system_clock::time_point deadline)
- : absolute_deadline_(deadline) {}
- ~ServerContextImpl() {}
-
- std::chrono::system_clock::time_point absolute_deadline() const {
- return absolute_deadline_;
- }
-
- private:
- std::chrono::system_clock::time_point absolute_deadline_;
-};
-
-} // namespace grpc
-
-#endif // __GRPCPP_INTERNAL_SERVER_SERVER_CONTEXT_IMPL_H_