diff options
Diffstat (limited to 'src/cpp/server/server_cc.cc')
-rw-r--r-- | src/cpp/server/server_cc.cc | 74 |
1 files changed, 41 insertions, 33 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 6bd3ecda32..6480482774 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -90,7 +90,8 @@ class Server::UnimplementedAsyncRequest final ServerCompletionQueue* const cq_; }; -typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> +typedef internal::SneakyCallOpSet<internal::CallOpSendInitialMetadata, + internal::CallOpServerSendStatus> UnimplementedAsyncResponseOp; class Server::UnimplementedAsyncResponse final : public UnimplementedAsyncResponseOp { @@ -108,12 +109,12 @@ class Server::UnimplementedAsyncResponse final UnimplementedAsyncRequest* const request_; }; -class ShutdownTag : public CompletionQueueTag { +class ShutdownTag : public internal::CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { return false; } }; -class DummyTag : public CompletionQueueTag { +class DummyTag : public internal::CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { *status = true; @@ -121,15 +122,15 @@ class DummyTag : public CompletionQueueTag { } }; -class Server::SyncRequest final : public CompletionQueueTag { +class Server::SyncRequest final : public internal::CompletionQueueTag { public: - SyncRequest(RpcServiceMethod* method, void* tag) + SyncRequest(internal::RpcServiceMethod* method, void* tag) : method_(method), tag_(tag), in_flight_(false), - has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || - method->method_type() == - RpcMethod::SERVER_STREAMING), + has_request_payload_( + method->method_type() == internal::RpcMethod::NORMAL_RPC || + method->method_type() == internal::RpcMethod::SERVER_STREAMING), call_details_(nullptr), cq_(nullptr) { grpc_metadata_array_init(&request_metadata_); @@ -212,14 +213,14 @@ class Server::SyncRequest final : public CompletionQueueTag { void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) { ctx_.BeginCompletionOp(&call_); global_callbacks->PreSynchronousRequest(&ctx_); - method_->handler()->RunHandler( - MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_)); + method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter( + &call_, &ctx_, request_payload_)); global_callbacks->PostSynchronousRequest(&ctx_); request_payload_ = nullptr; cq_.Shutdown(); - CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); + internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); /* Ensure the cq_ is shutdown */ @@ -229,15 +230,15 @@ class Server::SyncRequest final : public CompletionQueueTag { private: CompletionQueue cq_; - Call call_; + internal::Call call_; ServerContext ctx_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; - RpcServiceMethod* const method_; + internal::RpcServiceMethod* const method_; }; private: - RpcServiceMethod* const method_; + internal::RpcServiceMethod* const method_; void* const tag_; bool in_flight_; const bool has_request_payload_; @@ -266,8 +267,11 @@ class Server::SyncRequestThreadManager : public ThreadManager { WorkStatus PollForWork(void** tag, bool* ok) override { *tag = nullptr; + // TODO(ctiller): workaround for GPR_TIMESPAN based deadlines not working + // right now gpr_timespec deadline = - gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN); + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN)); switch (server_cq_->AsyncNext(tag, ok, deadline)) { case CompletionQueue::TIMEOUT: @@ -308,14 +312,15 @@ class Server::SyncRequestThreadManager : public ThreadManager { // object } - void AddSyncMethod(RpcServiceMethod* method, void* tag) { + void AddSyncMethod(internal::RpcServiceMethod* method, void* tag) { sync_requests_.emplace_back(new SyncRequest(method, tag)); } void AddUnknownSyncMethod() { if (!sync_requests_.empty()) { - unknown_method_.reset(new RpcServiceMethod( - "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); + unknown_method_.reset(new internal::RpcServiceMethod( + "unknown", internal::RpcMethod::BIDI_STREAMING, + new internal::UnknownMethodHandler)); sync_requests_.emplace_back( new SyncRequest(unknown_method_.get(), nullptr)); } @@ -352,8 +357,8 @@ class Server::SyncRequestThreadManager : public ThreadManager { CompletionQueue* server_cq_; int cq_timeout_msec_; std::vector<std::unique_ptr<SyncRequest>> sync_requests_; - std::unique_ptr<RpcServiceMethod> unknown_method_; - std::unique_ptr<RpcServiceMethod> health_check_; + std::unique_ptr<internal::RpcServiceMethod> unknown_method_; + std::unique_ptr<internal::RpcServiceMethod> health_check_; std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; }; @@ -436,13 +441,13 @@ std::shared_ptr<Channel> Server::InProcessChannel( } static grpc_server_register_method_payload_handling PayloadHandlingForMethod( - RpcServiceMethod* method) { + internal::RpcServiceMethod* method) { switch (method->method_type()) { - case RpcMethod::NORMAL_RPC: - case RpcMethod::SERVER_STREAMING: + case internal::RpcMethod::NORMAL_RPC: + case internal::RpcMethod::SERVER_STREAMING: return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER; - case RpcMethod::CLIENT_STREAMING: - case RpcMethod::BIDI_STREAMING: + case internal::RpcMethod::CLIENT_STREAMING: + case internal::RpcMethod::BIDI_STREAMING: return GRPC_SRM_PAYLOAD_NONE; } GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;); @@ -463,7 +468,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { continue; } - RpcServiceMethod* method = it->get(); + internal::RpcServiceMethod* method = it->get(); void* tag = grpc_server_register_method( server_, method->name(), host ? host->c_str() : nullptr, PayloadHandlingForMethod(method), 0); @@ -603,7 +608,8 @@ void Server::Wait() { } } -void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { +void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; @@ -619,8 +625,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { ServerInterface::BaseAsyncRequest::BaseAsyncRequest( ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, - bool delete_on_finalize) + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + void* tag, bool delete_on_finalize) : server_(server), context_(context), stream_(stream), @@ -642,7 +648,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, } context_->set_call(call_); context_->cq_ = call_cq_; - Call call(call_, server_, call_cq_, server_->max_receive_message_size()); + internal::Call call(call_, server_, call_cq_, + server_->max_receive_message_size()); if (*status && call_) { context_->BeginCompletionOp(&call); } @@ -657,7 +664,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + void* tag) : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} void ServerInterface::RegisteredAsyncRequest::IssueRequest( @@ -672,7 +680,7 @@ void ServerInterface::RegisteredAsyncRequest::IssueRequest( ServerInterface::GenericAsyncRequest::GenericAsyncRequest( ServerInterface* server, GenericServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) : BaseAsyncRequest(server, context, stream, call_cq, tag, delete_on_finalize) { @@ -715,7 +723,7 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( UnimplementedAsyncRequest* request) : request_(request) { Status status(StatusCode::UNIMPLEMENTED, ""); - UnknownMethodHandler::FillOps(request_->context(), this); + internal::UnknownMethodHandler::FillOps(request_->context(), this); request_->stream()->call_.PerformOps(this); } |