diff options
author | Vijay Pai <vpai@google.com> | 2017-06-21 15:45:05 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2017-07-10 14:33:53 -0700 |
commit | c0baec60a1ca380425c5514d1df584a63b646305 (patch) | |
tree | 064d6aa900b778640e4ce6493fb9e596c1e3ebad /src/cpp | |
parent | 6c1418e1dd7f8fe770531bcc9ac821fbf1ae5f1b (diff) |
Internalize structs and methods meant for being exposed through codegen
or that interface with core and are only for internal use
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/client/channel_cc.cc | 12 | ||||
-rw-r--r-- | src/cpp/client/generic_stub.cc | 6 | ||||
-rw-r--r-- | src/cpp/common/completion_queue_cc.cc | 2 | ||||
-rw-r--r-- | src/cpp/server/health/default_health_check_service.cc | 9 | ||||
-rw-r--r-- | src/cpp/server/health/default_health_check_service.h | 2 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 69 | ||||
-rw-r--r-- | src/cpp/server/server_context.cc | 8 |
7 files changed, 59 insertions, 49 deletions
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index f2d9bb07c9..038eb32e04 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -76,8 +76,9 @@ grpc::string Channel::GetServiceConfigJSON() const { &channel_info.service_config_json); } -Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, - CompletionQueue* cq) { +internal::Call Channel::CreateCall(const internal::RpcMethod& method, + ClientContext* context, + CompletionQueue* cq) { const bool kRegistered = method.channel_tag() && context->authority().empty(); grpc_call* c_call = NULL; if (kRegistered) { @@ -109,10 +110,11 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, } grpc_census_call_set_context(c_call, context->census_context()); context->set_call(c_call, shared_from_this()); - return Call(c_call, this, cq); + return internal::Call(c_call, this, cq); } -void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { +void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; @@ -131,7 +133,7 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) { } namespace { -class TagSaver final : public CompletionQueueTag { +class TagSaver final : public internal::CompletionQueueTag { public: explicit TagSaver(void* tag) : tag_(tag) {} ~TagSaver() override {} diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index 66b1ef0e39..e65cb9903f 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -27,9 +27,11 @@ std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call( ClientContext* context, const grpc::string& method, CompletionQueue* cq, void* tag) { return std::unique_ptr<GenericClientAsyncReaderWriter>( - GenericClientAsyncReaderWriter::Create( + GenericClientAsyncReaderWriter::internal::Create( channel_.get(), cq, - RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), context, tag)); + internal::RpcMethod(method.c_str(), + internal::RpcMethod::BIDI_STREAMING), + context, tag)); } } // namespace grpc diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index f34b0f3d58..000a03277b 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -60,7 +60,7 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( case GRPC_QUEUE_SHUTDOWN: return SHUTDOWN; case GRPC_OP_COMPLETE: - auto cq_tag = static_cast<CompletionQueueTag*>(ev.tag); + auto cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag); *ok = ev.success != 0; *tag = cq_tag; if (cq_tag->FinalizeResult(tag, ok)) { diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index 815b607032..fc7feb79fe 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -36,11 +36,12 @@ const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check"; DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( DefaultHealthCheckService* service) : service_(service), method_(nullptr) { - MethodHandler* handler = - new RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, ByteBuffer>( + internal::MethodHandler* handler = + new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, + ByteBuffer>( std::mem_fn(&HealthCheckServiceImpl::Check), this); - method_ = new RpcServiceMethod(kHealthCheckMethodName, RpcMethod::NORMAL_RPC, - handler); + method_ = new internal::RpcServiceMethod( + kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler); AddMethod(method_); } diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index 09d5cebe98..99d6680c50 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -41,7 +41,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { private: const DefaultHealthCheckService* const service_; - RpcServiceMethod* method_; + internal::RpcServiceMethod* method_; }; DefaultHealthCheckService(); diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 04abb6fd3e..3bff9999b9 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -86,7 +86,8 @@ class Server::UnimplementedAsyncRequest final ServerCompletionQueue* const cq_; }; -typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> +typedef internal::SneakyCallOpSet<internal::CallOpSendInitialMetadata, + internal::CallOpServerSendStatus> UnimplementedAsyncResponseOp; class Server::UnimplementedAsyncResponse final : public UnimplementedAsyncResponseOp { @@ -104,12 +105,12 @@ class Server::UnimplementedAsyncResponse final UnimplementedAsyncRequest* const request_; }; -class ShutdownTag : public CompletionQueueTag { +class ShutdownTag : public internal::CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { return false; } }; -class DummyTag : public CompletionQueueTag { +class DummyTag : public internal::CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { *status = true; @@ -117,15 +118,15 @@ class DummyTag : public CompletionQueueTag { } }; -class Server::SyncRequest final : public CompletionQueueTag { +class Server::SyncRequest final : public internal::CompletionQueueTag { public: - SyncRequest(RpcServiceMethod* method, void* tag) + SyncRequest(internal::RpcServiceMethod* method, void* tag) : method_(method), tag_(tag), in_flight_(false), - has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || - method->method_type() == - RpcMethod::SERVER_STREAMING), + has_request_payload_( + method->method_type() == internal::RpcMethod::NORMAL_RPC || + method->method_type() == internal::RpcMethod::SERVER_STREAMING), call_details_(nullptr), cq_(nullptr) { grpc_metadata_array_init(&request_metadata_); @@ -202,14 +203,14 @@ class Server::SyncRequest final : public CompletionQueueTag { void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) { ctx_.BeginCompletionOp(&call_); global_callbacks->PreSynchronousRequest(&ctx_); - method_->handler()->RunHandler( - MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_)); + method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter( + &call_, &ctx_, request_payload_)); global_callbacks->PostSynchronousRequest(&ctx_); request_payload_ = nullptr; cq_.Shutdown(); - CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); + internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); /* Ensure the cq_ is shutdown */ @@ -219,15 +220,15 @@ class Server::SyncRequest final : public CompletionQueueTag { private: CompletionQueue cq_; - Call call_; + internal::Call call_; ServerContext ctx_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; - RpcServiceMethod* const method_; + internal::RpcServiceMethod* const method_; }; private: - RpcServiceMethod* const method_; + internal::RpcServiceMethod* const method_; void* const tag_; bool in_flight_; const bool has_request_payload_; @@ -300,14 +301,15 @@ class Server::SyncRequestThreadManager : public ThreadManager { // object } - void AddSyncMethod(RpcServiceMethod* method, void* tag) { + void AddSyncMethod(internal::RpcServiceMethod* method, void* tag) { sync_requests_.emplace_back(new SyncRequest(method, tag)); } void AddUnknownSyncMethod() { if (!sync_requests_.empty()) { - unknown_method_.reset(new RpcServiceMethod( - "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); + unknown_method_.reset(new internal::RpcServiceMethod( + "unknown", internal::RpcMethod::BIDI_STREAMING, + new internal::UnknownMethodHandler)); sync_requests_.emplace_back( new SyncRequest(unknown_method_.get(), nullptr)); } @@ -344,8 +346,8 @@ class Server::SyncRequestThreadManager : public ThreadManager { CompletionQueue* server_cq_; int cq_timeout_msec_; std::vector<std::unique_ptr<SyncRequest>> sync_requests_; - std::unique_ptr<RpcServiceMethod> unknown_method_; - std::unique_ptr<RpcServiceMethod> health_check_; + std::unique_ptr<internal::RpcServiceMethod> unknown_method_; + std::unique_ptr<internal::RpcServiceMethod> health_check_; std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; }; @@ -421,13 +423,13 @@ void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { grpc_server* Server::c_server() { return server_; } static grpc_server_register_method_payload_handling PayloadHandlingForMethod( - RpcServiceMethod* method) { + internal::RpcServiceMethod* method) { switch (method->method_type()) { - case RpcMethod::NORMAL_RPC: - case RpcMethod::SERVER_STREAMING: + case internal::RpcMethod::NORMAL_RPC: + case internal::RpcMethod::SERVER_STREAMING: return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER; - case RpcMethod::CLIENT_STREAMING: - case RpcMethod::BIDI_STREAMING: + case internal::RpcMethod::CLIENT_STREAMING: + case internal::RpcMethod::BIDI_STREAMING: return GRPC_SRM_PAYLOAD_NONE; } GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;); @@ -448,7 +450,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { continue; } - RpcServiceMethod* method = it->get(); + internal::RpcServiceMethod* method = it->get(); void* tag = grpc_server_register_method( server_, method->name(), host ? host->c_str() : nullptr, PayloadHandlingForMethod(method), 0); @@ -588,7 +590,8 @@ void Server::Wait() { } } -void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { +void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; @@ -599,8 +602,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { ServerInterface::BaseAsyncRequest::BaseAsyncRequest( ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, - bool delete_on_finalize) + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + void* tag, bool delete_on_finalize) : server_(server), context_(context), stream_(stream), @@ -622,7 +625,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, } context_->set_call(call_); context_->cq_ = call_cq_; - Call call(call_, server_, call_cq_, server_->max_receive_message_size()); + internal::Call call(call_, server_, call_cq_, + server_->max_receive_message_size()); if (*status && call_) { context_->BeginCompletionOp(&call); } @@ -637,7 +641,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + void* tag) : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} void ServerInterface::RegisteredAsyncRequest::IssueRequest( @@ -651,7 +656,7 @@ void ServerInterface::RegisteredAsyncRequest::IssueRequest( ServerInterface::GenericAsyncRequest::GenericAsyncRequest( ServerInterface* server, GenericServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) : BaseAsyncRequest(server, context, stream, call_cq, tag, delete_on_finalize) { @@ -693,7 +698,7 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( UnimplementedAsyncRequest* request) : request_(request) { Status status(StatusCode::UNIMPLEMENTED, ""); - UnknownMethodHandler::FillOps(request_->context(), this); + internal::UnknownMethodHandler::FillOps(request_->context(), this); request_->stream()->call_.PerformOps(this); } diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 4913682f1d..2e55ffbac4 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -37,7 +37,7 @@ namespace grpc { // CompletionOp -class ServerContext::CompletionOp final : public CallOpSetInterface { +class ServerContext::CompletionOp final : public internal::CallOpSetInterface { public: // initial refs: one in the server context, one in the cq CompletionOp() @@ -146,7 +146,7 @@ ServerContext::~ServerContext() { } } -void ServerContext::BeginCompletionOp(Call* call) { +void ServerContext::BeginCompletionOp(internal::Call* call) { GPR_ASSERT(!completion_op_); completion_op_ = new CompletionOp(); if (has_notify_when_done_tag_) { @@ -155,8 +155,8 @@ void ServerContext::BeginCompletionOp(Call* call) { call->PerformOps(completion_op_); } -CompletionQueueTag* ServerContext::GetCompletionOpTag() { - return static_cast<CompletionQueueTag*>(completion_op_); +internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() { + return static_cast<internal::CompletionQueueTag*>(completion_op_); } void ServerContext::AddInitialMetadata(const grpc::string& key, |