diff options
author | Vijay Pai <vpai@google.com> | 2017-07-25 00:43:30 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-07-25 00:43:30 -0700 |
commit | 6abd20020b99514af9df381c72b509aeb88177a4 (patch) | |
tree | 57f023ebf29d0bf15c3288e1b4d7489bdde5b7d4 /src/cpp | |
parent | 2c12d506fa8113bf3c4ce5949c762a5a8d9e8ee5 (diff) |
Revert "Separate internal-only and public parts of C++ API"
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/client/channel_cc.cc | 12 | ||||
-rw-r--r-- | src/cpp/client/generic_stub.cc | 6 | ||||
-rw-r--r-- | src/cpp/common/completion_queue_cc.cc | 2 | ||||
-rw-r--r-- | src/cpp/server/health/default_health_check_service.cc | 9 | ||||
-rw-r--r-- | src/cpp/server/health/default_health_check_service.h | 2 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 69 | ||||
-rw-r--r-- | src/cpp/server/server_context.cc | 8 |
7 files changed, 49 insertions, 59 deletions
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 038eb32e04..f2d9bb07c9 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -76,9 +76,8 @@ grpc::string Channel::GetServiceConfigJSON() const { &channel_info.service_config_json); } -internal::Call Channel::CreateCall(const internal::RpcMethod& method, - ClientContext* context, - CompletionQueue* cq) { +Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, + CompletionQueue* cq) { const bool kRegistered = method.channel_tag() && context->authority().empty(); grpc_call* c_call = NULL; if (kRegistered) { @@ -110,11 +109,10 @@ internal::Call Channel::CreateCall(const internal::RpcMethod& method, } grpc_census_call_set_context(c_call, context->census_context()); context->set_call(c_call, shared_from_this()); - return internal::Call(c_call, this, cq); + return Call(c_call, this, cq); } -void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops, - internal::Call* call) { +void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; @@ -133,7 +131,7 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) { } namespace { -class TagSaver final : public internal::CompletionQueueTag { +class TagSaver final : public CompletionQueueTag { public: explicit TagSaver(void* tag) : tag_(tag) {} ~TagSaver() override {} diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index e65cb9903f..66b1ef0e39 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -27,11 +27,9 @@ std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call( ClientContext* context, const grpc::string& method, CompletionQueue* cq, void* tag) { return std::unique_ptr<GenericClientAsyncReaderWriter>( - GenericClientAsyncReaderWriter::internal::Create( + GenericClientAsyncReaderWriter::Create( channel_.get(), cq, - internal::RpcMethod(method.c_str(), - internal::RpcMethod::BIDI_STREAMING), - context, tag)); + RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), context, tag)); } } // namespace grpc diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index 000a03277b..f34b0f3d58 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -60,7 +60,7 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( case GRPC_QUEUE_SHUTDOWN: return SHUTDOWN; case GRPC_OP_COMPLETE: - auto cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag); + auto cq_tag = static_cast<CompletionQueueTag*>(ev.tag); *ok = ev.success != 0; *tag = cq_tag; if (cq_tag->FinalizeResult(tag, ok)) { diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index fc7feb79fe..815b607032 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -36,12 +36,11 @@ const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check"; DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( DefaultHealthCheckService* service) : service_(service), method_(nullptr) { - internal::MethodHandler* handler = - new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, - ByteBuffer>( + MethodHandler* handler = + new RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, ByteBuffer>( std::mem_fn(&HealthCheckServiceImpl::Check), this); - method_ = new internal::RpcServiceMethod( - kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler); + method_ = new RpcServiceMethod(kHealthCheckMethodName, RpcMethod::NORMAL_RPC, + handler); AddMethod(method_); } diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index 99d6680c50..09d5cebe98 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -41,7 +41,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { private: const DefaultHealthCheckService* const service_; - internal::RpcServiceMethod* method_; + RpcServiceMethod* method_; }; DefaultHealthCheckService(); diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 60e067d89d..92bacbe061 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -88,8 +88,7 @@ class Server::UnimplementedAsyncRequest final ServerCompletionQueue* const cq_; }; -typedef internal::SneakyCallOpSet<internal::CallOpSendInitialMetadata, - internal::CallOpServerSendStatus> +typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> UnimplementedAsyncResponseOp; class Server::UnimplementedAsyncResponse final : public UnimplementedAsyncResponseOp { @@ -107,12 +106,12 @@ class Server::UnimplementedAsyncResponse final UnimplementedAsyncRequest* const request_; }; -class ShutdownTag : public internal::CompletionQueueTag { +class ShutdownTag : public CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { return false; } }; -class DummyTag : public internal::CompletionQueueTag { +class DummyTag : public CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { *status = true; @@ -120,15 +119,15 @@ class DummyTag : public internal::CompletionQueueTag { } }; -class Server::SyncRequest final : public internal::CompletionQueueTag { +class Server::SyncRequest final : public CompletionQueueTag { public: - SyncRequest(internal::RpcServiceMethod* method, void* tag) + SyncRequest(RpcServiceMethod* method, void* tag) : method_(method), tag_(tag), in_flight_(false), - has_request_payload_( - method->method_type() == internal::RpcMethod::NORMAL_RPC || - method->method_type() == internal::RpcMethod::SERVER_STREAMING), + has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || + method->method_type() == + RpcMethod::SERVER_STREAMING), call_details_(nullptr), cq_(nullptr) { grpc_metadata_array_init(&request_metadata_); @@ -205,14 +204,14 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) { ctx_.BeginCompletionOp(&call_); global_callbacks->PreSynchronousRequest(&ctx_); - method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter( - &call_, &ctx_, request_payload_)); + method_->handler()->RunHandler( + MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_)); global_callbacks->PostSynchronousRequest(&ctx_); request_payload_ = nullptr; cq_.Shutdown(); - internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); + CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); /* Ensure the cq_ is shutdown */ @@ -222,15 +221,15 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { private: CompletionQueue cq_; - internal::Call call_; + Call call_; ServerContext ctx_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; - internal::RpcServiceMethod* const method_; + RpcServiceMethod* const method_; }; private: - internal::RpcServiceMethod* const method_; + RpcServiceMethod* const method_; void* const tag_; bool in_flight_; const bool has_request_payload_; @@ -303,15 +302,14 @@ class Server::SyncRequestThreadManager : public ThreadManager { // object } - void AddSyncMethod(internal::RpcServiceMethod* method, void* tag) { + void AddSyncMethod(RpcServiceMethod* method, void* tag) { sync_requests_.emplace_back(new SyncRequest(method, tag)); } void AddUnknownSyncMethod() { if (!sync_requests_.empty()) { - unknown_method_.reset(new internal::RpcServiceMethod( - "unknown", internal::RpcMethod::BIDI_STREAMING, - new internal::UnknownMethodHandler)); + unknown_method_.reset(new RpcServiceMethod( + "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); sync_requests_.emplace_back( new SyncRequest(unknown_method_.get(), nullptr)); } @@ -348,8 +346,8 @@ class Server::SyncRequestThreadManager : public ThreadManager { CompletionQueue* server_cq_; int cq_timeout_msec_; std::vector<std::unique_ptr<SyncRequest>> sync_requests_; - std::unique_ptr<internal::RpcServiceMethod> unknown_method_; - std::unique_ptr<internal::RpcServiceMethod> health_check_; + std::unique_ptr<RpcServiceMethod> unknown_method_; + std::unique_ptr<RpcServiceMethod> health_check_; std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; }; @@ -432,13 +430,13 @@ std::shared_ptr<Channel> Server::InProcessChannel( } static grpc_server_register_method_payload_handling PayloadHandlingForMethod( - internal::RpcServiceMethod* method) { + RpcServiceMethod* method) { switch (method->method_type()) { - case internal::RpcMethod::NORMAL_RPC: - case internal::RpcMethod::SERVER_STREAMING: + case RpcMethod::NORMAL_RPC: + case RpcMethod::SERVER_STREAMING: return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER; - case internal::RpcMethod::CLIENT_STREAMING: - case internal::RpcMethod::BIDI_STREAMING: + case RpcMethod::CLIENT_STREAMING: + case RpcMethod::BIDI_STREAMING: return GRPC_SRM_PAYLOAD_NONE; } GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;); @@ -459,7 +457,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { continue; } - internal::RpcServiceMethod* method = it->get(); + RpcServiceMethod* method = it->get(); void* tag = grpc_server_register_method( server_, method->name(), host ? host->c_str() : nullptr, PayloadHandlingForMethod(method), 0); @@ -599,8 +597,7 @@ void Server::Wait() { } } -void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops, - internal::Call* call) { +void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; @@ -611,8 +608,8 @@ void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops, ServerInterface::BaseAsyncRequest::BaseAsyncRequest( ServerInterface* server, ServerContext* context, - internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - void* tag, bool delete_on_finalize) + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, + bool delete_on_finalize) : server_(server), context_(context), stream_(stream), @@ -634,8 +631,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, } context_->set_call(call_); context_->cq_ = call_cq_; - internal::Call call(call_, server_, call_cq_, - server_->max_receive_message_size()); + Call call(call_, server_, call_cq_, server_->max_receive_message_size()); if (*status && call_) { context_->BeginCompletionOp(&call); } @@ -650,8 +646,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( ServerInterface* server, ServerContext* context, - internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - void* tag) + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} void ServerInterface::RegisteredAsyncRequest::IssueRequest( @@ -665,7 +660,7 @@ void ServerInterface::RegisteredAsyncRequest::IssueRequest( ServerInterface::GenericAsyncRequest::GenericAsyncRequest( ServerInterface* server, GenericServerContext* context, - internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) : BaseAsyncRequest(server, context, stream, call_cq, tag, delete_on_finalize) { @@ -707,7 +702,7 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( UnimplementedAsyncRequest* request) : request_(request) { Status status(StatusCode::UNIMPLEMENTED, ""); - internal::UnknownMethodHandler::FillOps(request_->context(), this); + UnknownMethodHandler::FillOps(request_->context(), this); request_->stream()->call_.PerformOps(this); } diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 2e55ffbac4..4913682f1d 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -37,7 +37,7 @@ namespace grpc { // CompletionOp -class ServerContext::CompletionOp final : public internal::CallOpSetInterface { +class ServerContext::CompletionOp final : public CallOpSetInterface { public: // initial refs: one in the server context, one in the cq CompletionOp() @@ -146,7 +146,7 @@ ServerContext::~ServerContext() { } } -void ServerContext::BeginCompletionOp(internal::Call* call) { +void ServerContext::BeginCompletionOp(Call* call) { GPR_ASSERT(!completion_op_); completion_op_ = new CompletionOp(); if (has_notify_when_done_tag_) { @@ -155,8 +155,8 @@ void ServerContext::BeginCompletionOp(internal::Call* call) { call->PerformOps(completion_op_); } -internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() { - return static_cast<internal::CompletionQueueTag*>(completion_op_); +CompletionQueueTag* ServerContext::GetCompletionOpTag() { + return static_cast<CompletionQueueTag*>(completion_op_); } void ServerContext::AddInitialMetadata(const grpc::string& key, |