aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/server.cc
diff options
context:
space:
mode:
authorGravatar Hongyu Chen <hongyu@google.com>2015-08-25 14:44:15 -0700
committerGravatar Hongyu Chen <hongyu@google.com>2015-08-25 14:44:15 -0700
commit011ea49592e71e1db3ef43a094aa9b452ff21e67 (patch)
tree70017b24ed2d3736552025680d4a0f138adf456f /src/cpp/server/server.cc
parenta96ce800a8df5c62ffd264317836ecf3433c4344 (diff)
parent1b481b64be43bd4c7655b25422344a17b2f198d9 (diff)
Merge remote-tracking branch 'upstream/master' into timespec
Diffstat (limited to 'src/cpp/server/server.cc')
-rw-r--r--src/cpp/server/server.cc106
1 files changed, 91 insertions, 15 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index e039c07374..66cd27cc33 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -32,24 +32,71 @@
*/
#include <grpc++/server.h>
+
#include <utility>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc++/completion_queue.h>
-#include <grpc++/async_generic_service.h>
+#include <grpc++/generic/async_generic_service.h>
#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/server_context.h>
#include <grpc++/server_credentials.h>
-#include <grpc++/thread_pool_interface.h>
-#include <grpc++/time.h>
+#include <grpc++/support/time.h>
#include "src/core/profiling/timers.h"
+#include "src/cpp/server/thread_pool_interface.h"
namespace grpc {
+class Server::UnimplementedAsyncRequestContext {
+ protected:
+ UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
+
+ GenericServerContext server_context_;
+ GenericServerAsyncReaderWriter generic_stream_;
+};
+
+class Server::UnimplementedAsyncRequest GRPC_FINAL
+ : public UnimplementedAsyncRequestContext,
+ public GenericAsyncRequest {
+ public:
+ UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq)
+ : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
+ NULL, false),
+ server_(server),
+ cq_(cq) {}
+
+ bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
+
+ ServerContext* context() { return &server_context_; }
+ GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
+
+ private:
+ Server* const server_;
+ ServerCompletionQueue* const cq_;
+};
+
+typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus>
+ UnimplementedAsyncResponseOp;
+class Server::UnimplementedAsyncResponse GRPC_FINAL
+ : public UnimplementedAsyncResponseOp {
+ public:
+ UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
+ ~UnimplementedAsyncResponse() { delete request_; }
+
+ bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
+ bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status);
+ delete this;
+ return r;
+ }
+
+ private:
+ UnimplementedAsyncRequest* const request_;
+};
+
class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) {
@@ -297,18 +344,23 @@ int Server::AddListeningPort(const grpc::string& addr,
return creds->AddPortToServer(addr, server_);
}
-bool Server::Start() {
+bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
GPR_ASSERT(!started_);
started_ = true;
grpc_server_start(server_);
if (!has_generic_service_) {
- unknown_method_.reset(new RpcServiceMethod(
- "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
- // Use of emplace_back with just constructor arguments is not accepted here
- // by gcc-4.4 because it can't match the anonymous nullptr with a proper
- // constructor implicitly. Construct the object and use push_back.
- sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
+ if (!sync_methods_->empty()) {
+ unknown_method_.reset(new RpcServiceMethod(
+ "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
+ // Use of emplace_back with just constructor arguments is not accepted
+ // here by gcc-4.4 because it can't match the anonymous nullptr with a
+ // proper constructor implicitly. Construct the object and use push_back.
+ sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
+ }
+ for (size_t i = 0; i < num_cqs; i++) {
+ new UnimplementedAsyncRequest(this, cqs[i]);
+ }
}
// Start processing rpcs.
if (!sync_methods_->empty()) {
@@ -370,12 +422,14 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
Server::BaseAsyncRequest::BaseAsyncRequest(
Server* server, ServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
+ bool delete_on_finalize)
: server_(server),
context_(context),
stream_(stream),
call_cq_(call_cq),
tag_(tag),
+ delete_on_finalize_(delete_on_finalize),
call_(nullptr) {
memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
}
@@ -402,14 +456,16 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
// just the pointers inside call are copied here
stream_->BindCall(&call);
*tag = tag_;
- delete this;
+ if (delete_on_finalize_) {
+ delete this;
+ }
return true;
}
Server::RegisteredAsyncRequest::RegisteredAsyncRequest(
Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
- : BaseAsyncRequest(server, context, stream, call_cq, tag) {}
+ : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
void Server::RegisteredAsyncRequest::IssueRequest(
void* registered_method, grpc_byte_buffer** payload,
@@ -423,8 +479,9 @@ void Server::RegisteredAsyncRequest::IssueRequest(
Server::GenericAsyncRequest::GenericAsyncRequest(
Server* server, GenericServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq, void* tag)
- : BaseAsyncRequest(server, context, stream, call_cq, tag) {
+ ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
+ : BaseAsyncRequest(server, context, stream, call_cq, tag,
+ delete_on_finalize) {
grpc_call_details_init(&call_details_);
GPR_ASSERT(notification_cq);
GPR_ASSERT(call_cq);
@@ -445,6 +502,25 @@ bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) {
return BaseAsyncRequest::FinalizeResult(tag, status);
}
+bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
+ bool* status) {
+ if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) {
+ new UnimplementedAsyncRequest(server_, cq_);
+ new UnimplementedAsyncResponse(this);
+ } else {
+ delete this;
+ }
+ return false;
+}
+
+Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
+ UnimplementedAsyncRequest* request)
+ : request_(request) {
+ Status status(StatusCode::UNIMPLEMENTED, "");
+ UnknownMethodHandler::FillOps(request_->context(), this);
+ request_->stream()->call_.PerformOps(this);
+}
+
void Server::ScheduleCallback() {
{
grpc::unique_lock<grpc::mutex> lock(mu_);