From c0baec60a1ca380425c5514d1df584a63b646305 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 21 Jun 2017 15:45:05 -0700 Subject: Internalize structs and methods meant for being exposed through codegen or that interface with core and are only for internal use --- include/grpc++/impl/codegen/async_stream.h | 443 ++++++++++++--------- include/grpc++/impl/codegen/async_unary_call.h | 66 +-- include/grpc++/impl/codegen/call.h | 10 +- include/grpc++/impl/codegen/call_hook.h | 2 + include/grpc++/impl/codegen/channel_interface.h | 34 +- include/grpc++/impl/codegen/client_context.h | 28 +- include/grpc++/impl/codegen/client_unary_call.h | 4 +- include/grpc++/impl/codegen/completion_queue.h | 56 +-- include/grpc++/impl/codegen/completion_queue_tag.h | 2 + include/grpc++/impl/codegen/metadata_map.h | 2 + include/grpc++/impl/codegen/method_handler_impl.h | 2 + include/grpc++/impl/codegen/rpc_method.h | 3 +- include/grpc++/impl/codegen/rpc_service_method.h | 3 +- include/grpc++/impl/codegen/server_context.h | 27 +- include/grpc++/impl/codegen/server_interface.h | 40 +- include/grpc++/impl/codegen/service_type.h | 46 +-- include/grpc++/impl/codegen/sync_stream.h | 398 ++++++++++-------- include/grpc++/impl/codegen/time.h | 6 +- 18 files changed, 671 insertions(+), 501 deletions(-) (limited to 'include/grpc++/impl') diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index 9cf7ac30dd..ddbf3e655e 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -30,6 +30,7 @@ namespace grpc { class CompletionQueue; +namespace internal { /// Common interface for all client side asynchronous streaming. class ClientAsyncStreamingInterface { public: @@ -146,9 +147,41 @@ class AsyncWriterInterface { } }; +} // namespace internal + template -class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface, - public AsyncReaderInterface {}; +class ClientAsyncReaderInterface + : public internal::ClientAsyncStreamingInterface, + public internal::AsyncReaderInterface {}; + +/// Common interface for client side asynchronous writing. +template +class ClientAsyncWriterInterface + : public internal::ClientAsyncStreamingInterface, + public internal::AsyncWriterInterface { + public: + /// Signal the client is done with the writes (half-close the client stream). + /// Thread-safe with respect to \a AsyncReaderInterface::Read + /// + /// \param[in] tag The tag identifying the operation. + virtual void WritesDone(void* tag) = 0; +}; + +/// Async client-side interface for bi-directional streaming, +/// where the client-to-server message stream has messages of type \a W, +/// and the server-to-client message stream has messages of type \a R. +template +class ClientAsyncReaderWriterInterface + : public internal::ClientAsyncStreamingInterface, + public internal::AsyncWriterInterface, + public internal::AsyncReaderInterface { + public: + /// Signal the client is done with the writes (half-close the client stream). + /// Thread-safe with respect to \a AsyncReaderInterface::Read + /// + /// \param[in] tag The tag identifying the operation. + virtual void WritesDone(void* tag) = 0; +}; /// Async client-side API for doing server-streaming RPCs, /// where the incoming message stream coming from the server has @@ -156,21 +189,24 @@ class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface, template class ClientAsyncReader final : public ClientAsyncReaderInterface { public: - /// Create a stream and write the first request out. - /// \a tag will be notified on \a cq when the call has been started and - /// \a request has been written out. - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - template - static ClientAsyncReader* Create(ChannelInterface* channel, - CompletionQueue* cq, const RpcMethod& method, - ClientContext* context, const W& request, - void* tag) { - Call call = channel->CreateCall(method, context, cq); - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncReader))) - ClientAsyncReader(call, context, request, tag); - } + struct internal { + /// Create a stream and write the first request out. + /// \a tag will be notified on \a cq when the call has been started and + /// \a request has been written out. + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + template + static ClientAsyncReader* Create(::grpc::ChannelInterface* channel, + CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request, + void* tag) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncReader))) + ClientAsyncReader(call, context, request, tag); + } + }; // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -218,8 +254,8 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface { private: template - ClientAsyncReader(Call call, ClientContext* context, const W& request, - void* tag) + ClientAsyncReader(::grpc::internal::Call call, ClientContext* context, + const W& request, void* tag) : context_(context), call_(call) { init_ops_.set_output_tag(tag); init_ops_.SendInitialMetadata(context->send_initial_metadata_, @@ -231,24 +267,19 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface { } ClientContext* context_; - Call call_; - CallOpSet + ::grpc::internal::Call call_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> init_ops_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet finish_ops_; -}; - -/// Common interface for client side asynchronous writing. -template -class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, - public AsyncWriterInterface { - public: - /// Signal the client is done with the writes (half-close the client stream). - /// Thread-safe with respect to \a AsyncReaderInterface::Read - /// - /// \param[in] tag The tag identifying the operation. - virtual void WritesDone(void* tag) = 0; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage> + read_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpClientRecvStatus> + finish_ops_; }; /// Async API on the client side for doing client-streaming RPCs, @@ -257,24 +288,27 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, template class ClientAsyncWriter final : public ClientAsyncWriterInterface { public: - /// Create a stream and write the first request out. - /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent) and \a request has been written out. - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - /// \a response will be filled in with the single expected response - /// message from the server upon a successful call to the \a Finish - /// method of this instance. - template - static ClientAsyncWriter* Create(ChannelInterface* channel, - CompletionQueue* cq, const RpcMethod& method, - ClientContext* context, R* response, - void* tag) { - Call call = channel->CreateCall(method, context, cq); - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncWriter))) - ClientAsyncWriter(call, context, response, tag); - } + struct internal { + /// Create a stream and write the first request out. + /// \a tag will be notified on \a cq when the call has been started (i.e. + /// intitial metadata sent) and \a request has been written out. + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + /// \a response will be filled in with the single expected response + /// message from the server upon a successful call to the \a Finish + /// method of this instance. + template + static ClientAsyncWriter* Create(::grpc::ChannelInterface* channel, + CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, R* response, + void* tag) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncWriter))) + ClientAsyncWriter(call, context, response, tag); + } + }; // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -338,7 +372,8 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface { private: template - ClientAsyncWriter(Call call, ClientContext* context, R* response, void* tag) + ClientAsyncWriter(::grpc::internal::Call call, ClientContext* context, + R* response, void* tag) : context_(context), call_(call) { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); @@ -356,30 +391,19 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface { } ClientContext* context_; - Call call_; - CallOpSet meta_ops_; - CallOpSet + ::grpc::internal::Call call_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> write_ops_; - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpGenericRecvMessage, + ::grpc::internal::CallOpClientRecvStatus> finish_ops_; }; -/// Async client-side interface for bi-directional streaming, -/// where the client-to-server message stream has messages of type \a W, -/// and the server-to-client message stream has messages of type \a R. -template -class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface, - public AsyncWriterInterface, - public AsyncReaderInterface { - public: - /// Signal the client is done with the writes (half-close the client stream). - /// Thread-safe with respect to \a AsyncReaderInterface::Read - /// - /// \param[in] tag The tag identifying the operation. - virtual void WritesDone(void* tag) = 0; -}; - /// Async client-side interface for bi-directional streaming, /// where the outgoing message stream going to the server /// has messages of type \a W, and the incoming message stream coming @@ -388,21 +412,23 @@ template class ClientAsyncReaderWriter final : public ClientAsyncReaderWriterInterface { public: - /// Create a stream and write the first request out. - /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent). - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - static ClientAsyncReaderWriter* Create(ChannelInterface* channel, - CompletionQueue* cq, - const RpcMethod& method, - ClientContext* context, void* tag) { - Call call = channel->CreateCall(method, context, cq); - - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncReaderWriter))) - ClientAsyncReaderWriter(call, context, tag); - } + struct internal { + /// Create a stream and write the first request out. + /// \a tag will be notified on \a cq when the call has been started (i.e. + /// intitial metadata sent). + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + static ClientAsyncReaderWriter* Create( + ::grpc::ChannelInterface* channel, CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, ClientContext* context, + void* tag) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncReaderWriter))) + ClientAsyncReaderWriter(call, context, tag); + } + }; // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -471,7 +497,8 @@ class ClientAsyncReaderWriter final } private: - ClientAsyncReaderWriter(Call call, ClientContext* context, void* tag) + ClientAsyncReaderWriter(::grpc::internal::Call call, ClientContext* context, + void* tag) : context_(context), call_(call) { if (context_->initial_metadata_corked_) { // if corked bit is set in context, we buffer up the initial metadata to @@ -487,17 +514,25 @@ class ClientAsyncReaderWriter final } ClientContext* context_; - Call call_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet + ::grpc::internal::Call call_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage> + read_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> write_ops_; - CallOpSet finish_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpClientRecvStatus> + finish_ops_; }; template -class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface, - public AsyncReaderInterface { +class ServerAsyncReaderInterface + : public internal::ServerAsyncStreamingInterface, + public internal::AsyncReaderInterface { public: /// Indicate that the stream is to be finished with a certain status code /// and also send out \a msg response to the client. @@ -541,6 +576,89 @@ class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface, virtual void FinishWithError(const Status& status, void* tag) = 0; }; +template +class ServerAsyncWriterInterface + : public internal::ServerAsyncStreamingInterface, + public internal::AsyncWriterInterface { + public: + /// Indicate that the stream is to be finished with a certain status code. + /// Request notification for when the server has sent the appropriate + /// signals to the client to end the call. + /// Should not be used concurrently with other operations. + /// + /// It is appropriate to call this method when either: + /// * all messages from the client have been received (either known + /// implictly, or explicitly because a previous \a + /// AsyncReaderInterface::Read operation with a non-ok + /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'. + /// * it is desired to end the call early with some non-OK status code. + /// + /// This operation will end when the server has finished sending out initial + /// metadata (if not sent already), response message, and status, or if + /// some failure occurred when trying to do so. + /// + /// \param[in] tag Tag identifying this request. + /// \param[in] status To be sent to the client as the result of this call. + virtual void Finish(const Status& status, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with trailing metadata which + /// contains \a status, using WriteOptions options with + /// identifying tag \a tag. + /// + /// WriteAndFinish is equivalent of performing WriteLast and Finish + /// in a single step. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] status The Status that server returns to client. + /// \param[in] tag The tag identifying the operation. + virtual void WriteAndFinish(const W& msg, WriteOptions options, + const Status& status, void* tag) = 0; +}; + +/// Server-side interface for asynchronous bi-directional streaming. +template +class ServerAsyncReaderWriterInterface + : public internal::ServerAsyncStreamingInterface, + public internal::AsyncWriterInterface, + public internal::AsyncReaderInterface { + public: + /// Indicate that the stream is to be finished with a certain status code. + /// Request notification for when the server has sent the appropriate + /// signals to the client to end the call. + /// Should not be used concurrently with other operations. + /// + /// It is appropriate to call this method when either: + /// * all messages from the client have been received (either known + /// implictly, or explicitly because a previous \a + /// AsyncReaderInterface::Read operation + /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' + /// with 'false'. + /// * it is desired to end the call early with some non-OK status code. + /// + /// This operation will end when the server has finished sending out initial + /// metadata (if not sent already), response message, and status, or if some + /// failure occurred when trying to do so. + /// + /// \param[in] tag Tag identifying this request. + /// \param[in] status To be sent to the client as the result of this call. + virtual void Finish(const Status& status, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with trailing metadata which + /// contains \a status, using WriteOptions options with + /// identifying tag \a tag. + /// + /// WriteAndFinish is equivalent of performing WriteLast and Finish in a + /// single step. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] status The Status that server returns to client. + /// \param[in] tag The tag identifying the operation. + virtual void WriteAndFinish(const W& msg, WriteOptions options, + const Status& status, void* tag) = 0; +}; + /// Async server-side API for doing client-streaming RPCs, /// where the incoming message stream from the client has messages of type \a R, /// and the single response message sent from the server is type \a W. @@ -624,56 +742,19 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface { } private: - void BindCall(Call* call) override { call_ = *call; } + void BindCall(::grpc::internal::Call* call) override { call_ = *call; } - Call call_; + ::grpc::internal::Call call_; ServerContext* ctx_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage> read_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> finish_ops_; }; -template -class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface, - public AsyncWriterInterface { - public: - /// Indicate that the stream is to be finished with a certain status code. - /// Request notification for when the server has sent the appropriate - /// signals to the client to end the call. - /// Should not be used concurrently with other operations. - /// - /// It is appropriate to call this method when either: - /// * all messages from the client have been received (either known - /// implictly, or explicitly because a previous \a - /// AsyncReaderInterface::Read operation with a non-ok - /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'. - /// * it is desired to end the call early with some non-OK status code. - /// - /// This operation will end when the server has finished sending out initial - /// metadata (if not sent already), response message, and status, or if - /// some failure occurred when trying to do so. - /// - /// \param[in] tag Tag identifying this request. - /// \param[in] status To be sent to the client as the result of this call. - virtual void Finish(const Status& status, void* tag) = 0; - - /// Request the writing of \a msg and coalesce it with trailing metadata which - /// contains \a status, using WriteOptions options with - /// identifying tag \a tag. - /// - /// WriteAndFinish is equivalent of performing WriteLast and Finish - /// in a single step. - /// - /// \param[in] msg The message to be written. - /// \param[in] options The WriteOptions to be used to write this message. - /// \param[in] status The Status that server returns to client. - /// \param[in] tag The tag identifying the operation. - virtual void WriteAndFinish(const W& msg, WriteOptions options, - const Status& status, void* tag) = 0; -}; - /// Async server-side API for doing server streaming RPCs, /// where the outgoing message stream from the server has messages of type \a W. template @@ -755,7 +836,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface { } private: - void BindCall(Call* call) override { call_ = *call; } + void BindCall(::grpc::internal::Call* call) override { call_ = *call; } template void EnsureInitialMetadataSent(T* ops) { @@ -769,55 +850,17 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface { } } - Call call_; + ::grpc::internal::Call call_; ServerContext* ctx_; - CallOpSet meta_ops_; - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> write_ops_; - CallOpSet finish_ops_; -}; - -/// Server-side interface for asynchronous bi-directional streaming. -template -class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface, - public AsyncWriterInterface, - public AsyncReaderInterface { - public: - /// Indicate that the stream is to be finished with a certain status code. - /// Request notification for when the server has sent the appropriate - /// signals to the client to end the call. - /// Should not be used concurrently with other operations. - /// - /// It is appropriate to call this method when either: - /// * all messages from the client have been received (either known - /// implictly, or explicitly because a previous \a - /// AsyncReaderInterface::Read operation - /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' - /// with 'false'. - /// * it is desired to end the call early with some non-OK status code. - /// - /// This operation will end when the server has finished sending out initial - /// metadata (if not sent already), response message, and status, or if some - /// failure occurred when trying to do so. - /// - /// \param[in] tag Tag identifying this request. - /// \param[in] status To be sent to the client as the result of this call. - virtual void Finish(const Status& status, void* tag) = 0; - - /// Request the writing of \a msg and coalesce it with trailing metadata which - /// contains \a status, using WriteOptions options with - /// identifying tag \a tag. - /// - /// WriteAndFinish is equivalent of performing WriteLast and Finish in a - /// single step. - /// - /// \param[in] msg The message to be written. - /// \param[in] options The WriteOptions to be used to write this message. - /// \param[in] status The Status that server returns to client. - /// \param[in] tag The tag identifying the operation. - virtual void WriteAndFinish(const W& msg, WriteOptions options, - const Status& status, void* tag) = 0; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpServerSendStatus> + finish_ops_; }; /// Async server-side API for doing bidirectional streaming RPCs, @@ -912,7 +955,7 @@ class ServerAsyncReaderWriter final private: friend class ::grpc::Server; - void BindCall(Call* call) override { call_ = *call; } + void BindCall(::grpc::internal::Call* call) override { call_ = *call; } template void EnsureInitialMetadataSent(T* ops) { @@ -926,14 +969,18 @@ class ServerAsyncReaderWriter final } } - Call call_; + ::grpc::internal::Call call_; ServerContext* ctx_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage> read_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> write_ops_; - CallOpSet finish_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpServerSendStatus> + finish_ops_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index 41b3ae3f28..45a8e8ee6a 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -75,17 +75,18 @@ class ClientAsyncResponseReader final /// intitial metadata sent) and \a request has been written out. /// Note that \a context will be used to fill in custom initial metadata /// used to send to the server when starting the call. - template - static ClientAsyncResponseReader* Create(ChannelInterface* channel, - CompletionQueue* cq, - const RpcMethod& method, - ClientContext* context, - const W& request) { - Call call = channel->CreateCall(method, context, cq); - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncResponseReader))) - ClientAsyncResponseReader(call, context, request); - } + struct internal { + template + static ClientAsyncResponseReader* Create( + ::grpc::ChannelInterface* channel, CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, ClientContext* context, + const W& request) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncResponseReader))) + ClientAsyncResponseReader(call, context, request); + } + }; /// TODO(vjpai): Delete the below constructor /// PLEASE DO NOT USE THIS CONSTRUCTOR IN NEW CODE @@ -94,9 +95,10 @@ class ClientAsyncResponseReader final /// created this struct rather than properly using a stub. /// This code will not remain a valid public constructor for long. template - ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - const W& request) + ClientAsyncResponseReader(::grpc::ChannelInterface* channel, + CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request) : context_(context), call_(channel->CreateCall(method, context, cq)), collection_(std::make_shared()) { @@ -164,10 +166,11 @@ class ClientAsyncResponseReader final private: ClientContext* const context_; - Call call_; + ::grpc::internal::Call call_; template - ClientAsyncResponseReader(Call call, ClientContext* context, const W& request) + ClientAsyncResponseReader(::grpc::internal::Call call, ClientContext* context, + const W& request) : context_(context), call_(call) { ops_.init_buf.SendInitialMetadata(context->send_initial_metadata_, context->initial_metadata_flags()); @@ -183,13 +186,17 @@ class ClientAsyncResponseReader final // TODO(vjpai): Remove the reference to CallOpSetCollectionInterface // as soon as the related workaround (public constructor) is deleted - struct Ops : public CallOpSetCollectionInterface { - SneakyCallOpSet + struct Ops : public ::grpc::internal::CallOpSetCollectionInterface { + ::grpc::internal::SneakyCallOpSet< + ::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> init_buf; - CallOpSet meta_buf; - CallOpSet, - CallOpClientRecvStatus> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + meta_buf; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage, + ::grpc::internal::CallOpClientRecvStatus> finish_buf; } ops_; @@ -201,7 +208,8 @@ class ClientAsyncResponseReader final /// Async server-side API for handling unary calls, where the single /// response message sent to the client is of type \a W. template -class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { +class ServerAsyncResponseWriter final + : public internal::ServerAsyncStreamingInterface { public: explicit ServerAsyncResponseWriter(ServerContext* ctx) : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} @@ -289,13 +297,15 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { } private: - void BindCall(Call* call) override { call_ = *call; } + void BindCall(::grpc::internal::Call* call) override { call_ = *call; } - Call call_; + ::grpc::internal::Call call_; ServerContext* ctx_; - CallOpSet meta_buf_; - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_buf_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> finish_buf_; }; diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 342ea46203..32bd2ad5c7 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -44,11 +44,13 @@ struct grpc_byte_buffer; namespace grpc { class ByteBuffer; -class Call; -class CallHook; class CompletionQueue; extern CoreCodegenInterface* g_core_codegen_interface; +namespace internal { +class Call; +class CallHook; + const char kBinaryErrorDetailsKey[] = "grpc-status-details-bin"; // TODO(yangg) if the map is changed before we send, the pointers will be a @@ -76,6 +78,7 @@ inline grpc_metadata* FillMetadataArray( } return metadata_array; } +} // namespace internal /// Per-message write options. class WriteOptions { @@ -191,6 +194,7 @@ class WriteOptions { bool last_message_; }; +namespace internal { /// Default argument for CallOpSet. I is unused by the class, but can be /// used for generating multiple names for the same thing. template @@ -673,7 +677,7 @@ class Call final { grpc_call* call_; int max_receive_message_size_; }; - +} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_CALL_H diff --git a/include/grpc++/impl/codegen/call_hook.h b/include/grpc++/impl/codegen/call_hook.h index d026cc8b58..44e9de220e 100644 --- a/include/grpc++/impl/codegen/call_hook.h +++ b/include/grpc++/impl/codegen/call_hook.h @@ -21,6 +21,7 @@ namespace grpc { +namespace internal { class CallOpSetInterface; class Call; @@ -31,6 +32,7 @@ class CallHook { virtual ~CallHook() {} virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; }; +} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/channel_interface.h b/include/grpc++/impl/codegen/channel_interface.h index 1b7590bf0c..cf1d77e905 100644 --- a/include/grpc++/impl/codegen/channel_interface.h +++ b/include/grpc++/impl/codegen/channel_interface.h @@ -24,10 +24,8 @@ #include namespace grpc { -class Call; +class ChannelInterface; class ClientContext; -class RpcMethod; -class CallOpSetInterface; class CompletionQueue; template @@ -45,6 +43,16 @@ class ClientAsyncReaderWriter; template class ClientAsyncResponseReader; +namespace internal { +class Call; +class CallOpSetInterface; +class RpcMethod; +template +Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result); +} // namespace internal + /// Codegen interface for \a grpc::Channel. class ChannelInterface { public: @@ -96,15 +104,16 @@ class ChannelInterface { template friend class ::grpc::ClientAsyncResponseReader; template - friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, - ClientContext* context, - const InputMessage& request, - OutputMessage* result); - friend class ::grpc::RpcMethod; - virtual Call CreateCall(const RpcMethod& method, ClientContext* context, - CompletionQueue* cq) = 0; - virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; + friend Status(::grpc::internal::BlockingUnaryCall)( + ChannelInterface* channel, const internal::RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result); + friend class ::grpc::internal::RpcMethod; + virtual internal::Call CreateCall(const internal::RpcMethod& method, + ClientContext* context, + CompletionQueue* cq) = 0; + virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) = 0; virtual void* RegisterMethod(const char* method) = 0; virtual void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline, @@ -112,7 +121,6 @@ class ChannelInterface { virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline) = 0; }; - } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_CHANNEL_INTERFACE_H diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h index c2a44e41ce..22d0069afa 100644 --- a/include/grpc++/impl/codegen/client_context.h +++ b/include/grpc++/impl/codegen/client_context.h @@ -60,7 +60,18 @@ class Channel; class ChannelInterface; class CompletionQueue; class CallCredentials; +class ClientContext; + +namespace internal { class RpcMethod; +class CallOpClientRecvStatus; +class CallOpRecvInitialMetadata; +template +Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result); +} // namespace internal + template class ClientReader; template @@ -345,8 +356,8 @@ class ClientContext { ClientContext& operator=(const ClientContext&); friend class ::grpc::testing::InteropClientContextInspector; - friend class CallOpClientRecvStatus; - friend class CallOpRecvInitialMetadata; + friend class ::grpc::internal::CallOpClientRecvStatus; + friend class ::grpc::internal::CallOpRecvInitialMetadata; friend class Channel; template friend class ::grpc::ClientReader; @@ -363,11 +374,10 @@ class ClientContext { template friend class ::grpc::ClientAsyncResponseReader; template - friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, - ClientContext* context, - const InputMessage& request, - OutputMessage* result); + friend Status(::grpc::internal::BlockingUnaryCall)( + ChannelInterface* channel, const internal::RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result); grpc_call* call() const { return call_; } void set_call(grpc_call* call, const std::shared_ptr& channel); @@ -399,8 +409,8 @@ class ClientContext { mutable std::shared_ptr auth_context_; struct census_context* census_context_; std::multimap send_initial_metadata_; - MetadataMap recv_initial_metadata_; - MetadataMap trailing_metadata_; + internal::MetadataMap recv_initial_metadata_; + internal::MetadataMap trailing_metadata_; grpc_call* propagate_from_call_; PropagationOptions propagation_options_; diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h index 7c540fade9..8fef3ab353 100644 --- a/include/grpc++/impl/codegen/client_unary_call.h +++ b/include/grpc++/impl/codegen/client_unary_call.h @@ -30,8 +30,9 @@ namespace grpc { class Channel; class ClientContext; class CompletionQueue; -class RpcMethod; +namespace internal { +class RpcMethod; /// Wrapper that performs a blocking unary call template Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, @@ -67,6 +68,7 @@ Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, return status; } +} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_CLIENT_UNARY_CALL_H diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index ca757e2a9c..a04778aa72 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -56,7 +56,19 @@ class ServerWriter; namespace internal { template class ServerReaderWriterBody; -} +} // namespace internal + +class Channel; +class ChannelInterface; +class ClientContext; +class CompletionQueue; +class Server; +class ServerBuilder; +class ServerContext; + +namespace internal { +class CompletionQueueTag; +class RpcMethod; template class RpcMethodHandler; template @@ -66,16 +78,13 @@ class ServerStreamingHandler; template class BidiStreamingHandler; class UnknownMethodHandler; - -class Channel; -class ChannelInterface; -class ClientContext; -class CompletionQueueTag; -class CompletionQueue; -class RpcMethod; -class Server; -class ServerBuilder; -class ServerContext; +template +class TemplatedBidiStreamingHandler; +template +Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result); +} // namespace internal extern CoreCodegenInterface* g_core_codegen_interface; @@ -196,28 +205,27 @@ class CompletionQueue : private GrpcLibraryCodegen { template friend class ::grpc::internal::ServerReaderWriterBody; template - friend class RpcMethodHandler; + friend class ::grpc::internal::RpcMethodHandler; template - friend class ClientStreamingHandler; + friend class ::grpc::internal::ClientStreamingHandler; template - friend class ServerStreamingHandler; + friend class ::grpc::internal::ServerStreamingHandler; template - friend class TemplatedBidiStreamingHandler; - friend class UnknownMethodHandler; + friend class ::grpc::internal::TemplatedBidiStreamingHandler; + friend class ::grpc::internal::UnknownMethodHandler; friend class ::grpc::Server; friend class ::grpc::ServerContext; template - friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, - ClientContext* context, - const InputMessage& request, - OutputMessage* result); + friend Status(::grpc::internal::BlockingUnaryCall)( + ChannelInterface* channel, const internal::RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result); NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); /// Wraps \a grpc_completion_queue_pluck. /// \warning Must not be mixed with calls to \a Next. - bool Pluck(CompletionQueueTag* tag) { + bool Pluck(internal::CompletionQueueTag* tag) { auto deadline = g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME); auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( @@ -238,7 +246,7 @@ class CompletionQueue : private GrpcLibraryCodegen { /// implementation to simple call the other TryPluck function with a zero /// timeout. i.e: /// TryPluck(tag, gpr_time_0(GPR_CLOCK_REALTIME)) - void TryPluck(CompletionQueueTag* tag) { + void TryPluck(internal::CompletionQueueTag* tag) { auto deadline = g_core_codegen_interface->gpr_time_0(GPR_CLOCK_REALTIME); auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( cq_, tag, deadline, nullptr); @@ -254,7 +262,7 @@ class CompletionQueue : private GrpcLibraryCodegen { /// /// This exects tag->FinalizeResult (if called) to return 'false' i.e expects /// that the tag is internal not something that is returned to the user. - void TryPluck(CompletionQueueTag* tag, gpr_timespec deadline) { + void TryPluck(internal::CompletionQueueTag* tag, gpr_timespec deadline) { auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( cq_, tag, deadline, nullptr); if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) { diff --git a/include/grpc++/impl/codegen/completion_queue_tag.h b/include/grpc++/impl/codegen/completion_queue_tag.h index 4d7d3a98dd..cb16bcf9ff 100644 --- a/include/grpc++/impl/codegen/completion_queue_tag.h +++ b/include/grpc++/impl/codegen/completion_queue_tag.h @@ -21,6 +21,7 @@ namespace grpc { +namespace internal { /// An interface allowing implementors to process and filter event tags. class CompletionQueueTag { public: @@ -31,6 +32,7 @@ class CompletionQueueTag { /// queue virtual bool FinalizeResult(void** tag, bool* status) = 0; }; +} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/metadata_map.h b/include/grpc++/impl/codegen/metadata_map.h index b73985967d..fd4750efdd 100644 --- a/include/grpc++/impl/codegen/metadata_map.h +++ b/include/grpc++/impl/codegen/metadata_map.h @@ -23,6 +23,7 @@ namespace grpc { +namespace internal { class MetadataMap { public: MetadataMap() { memset(&arr_, 0, sizeof(arr_)); } @@ -50,6 +51,7 @@ class MetadataMap { grpc_metadata_array arr_; std::multimap map_; }; +} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 15e24bdcdc..87e9e5e952 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -25,6 +25,7 @@ namespace grpc { +namespace internal { /// A wrapper class of an application provided rpc method handler. template class RpcMethodHandler : public MethodHandler { @@ -265,6 +266,7 @@ class UnknownMethodHandler : public MethodHandler { } }; +} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H diff --git a/include/grpc++/impl/codegen/rpc_method.h b/include/grpc++/impl/codegen/rpc_method.h index ac13ac56c7..54e52364ef 100644 --- a/include/grpc++/impl/codegen/rpc_method.h +++ b/include/grpc++/impl/codegen/rpc_method.h @@ -24,7 +24,7 @@ #include namespace grpc { - +namespace internal { /// Descriptor of an RPC method class RpcMethod { public: @@ -55,6 +55,7 @@ class RpcMethod { void* const channel_tag_; }; +} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_RPC_METHOD_H diff --git a/include/grpc++/impl/codegen/rpc_service_method.h b/include/grpc++/impl/codegen/rpc_service_method.h index 7165774172..635e40469b 100644 --- a/include/grpc++/impl/codegen/rpc_service_method.h +++ b/include/grpc++/impl/codegen/rpc_service_method.h @@ -35,8 +35,8 @@ struct grpc_byte_buffer; namespace grpc { class ServerContext; -class StreamContextInterface; +namespace internal { /// Base class for running an RPC handler. class MethodHandler { public: @@ -71,6 +71,7 @@ class RpcServiceMethod : public RpcMethod { void* server_tag_; std::unique_ptr handler_; }; +} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index b5e37fd12b..a2d6967bf8 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -55,7 +55,6 @@ class ServerWriter; namespace internal { template class ServerReaderWriterBody; -} template class RpcMethodHandler; template @@ -65,9 +64,11 @@ class ServerStreamingHandler; template class BidiStreamingHandler; class UnknownMethodHandler; - +template +class TemplatedBidiStreamingHandler; class Call; -class CallOpBuffer; +} // namespace internal + class CompletionQueue; class Server; class ServerInterface; @@ -247,14 +248,14 @@ class ServerContext { template friend class ::grpc::internal::ServerReaderWriterBody; template - friend class RpcMethodHandler; + friend class ::grpc::internal::RpcMethodHandler; template - friend class ClientStreamingHandler; + friend class ::grpc::internal::ClientStreamingHandler; template - friend class ServerStreamingHandler; + friend class ::grpc::internal::ServerStreamingHandler; template - friend class TemplatedBidiStreamingHandler; - friend class UnknownMethodHandler; + friend class ::grpc::internal::TemplatedBidiStreamingHandler; + friend class ::grpc::internal::UnknownMethodHandler; friend class ::grpc::ClientContext; /// Prevent copying. @@ -263,9 +264,9 @@ class ServerContext { class CompletionOp; - void BeginCompletionOp(Call* call); + void BeginCompletionOp(internal::Call* call); /// Return the tag queued by BeginCompletionOp() - CompletionQueueTag* GetCompletionOpTag(); + internal::CompletionQueueTag* GetCompletionOpTag(); ServerContext(gpr_timespec deadline, grpc_metadata_array* arr); @@ -282,7 +283,7 @@ class ServerContext { CompletionQueue* cq_; bool sent_initial_metadata_; mutable std::shared_ptr auth_context_; - MetadataMap client_metadata_; + internal::MetadataMap client_metadata_; std::multimap initial_metadata_; std::multimap trailing_metadata_; @@ -290,7 +291,9 @@ class ServerContext { grpc_compression_level compression_level_; grpc_compression_algorithm compression_algorithm_; - CallOpSet pending_ops_; + internal::CallOpSet + pending_ops_; bool has_pending_ops_; }; diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h index 87bd085a37..9d120031ca 100644 --- a/include/grpc++/impl/codegen/server_interface.h +++ b/include/grpc++/impl/codegen/server_interface.h @@ -29,20 +29,21 @@ namespace grpc { class AsyncGenericService; class GenericServerContext; -class RpcService; -class ServerAsyncStreamingInterface; class ServerCompletionQueue; class ServerContext; class ServerCredentials; class Service; -class ThreadPoolInterface; extern CoreCodegenInterface* g_core_codegen_interface; /// Models a gRPC server. /// /// Servers are configured and started via \a grpc::ServerBuilder. -class ServerInterface : public CallHook { +namespace internal { +class ServerAsyncStreamingInterface; +} // namespace internal + +class ServerInterface : public internal::CallHook { public: virtual ~ServerInterface() {} @@ -77,7 +78,7 @@ class ServerInterface : public CallHook { virtual void Wait() = 0; protected: - friend class Service; + friend class ::grpc::Service; /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the Server instance. @@ -115,12 +116,13 @@ class ServerInterface : public CallHook { virtual grpc_server* server() = 0; - virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; + virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) = 0; - class BaseAsyncRequest : public CompletionQueueTag { + class BaseAsyncRequest : public internal::CompletionQueueTag { public: BaseAsyncRequest(ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, bool delete_on_finalize); virtual ~BaseAsyncRequest(); @@ -130,7 +132,7 @@ class ServerInterface : public CallHook { protected: ServerInterface* const server_; ServerContext* const context_; - ServerAsyncStreamingInterface* const stream_; + internal::ServerAsyncStreamingInterface* const stream_; CompletionQueue* const call_cq_; void* const tag_; const bool delete_on_finalize_; @@ -140,7 +142,7 @@ class ServerInterface : public CallHook { class RegisteredAsyncRequest : public BaseAsyncRequest { public: RegisteredAsyncRequest(ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag); // uses BaseAsyncRequest::FinalizeResult @@ -154,7 +156,7 @@ class ServerInterface : public CallHook { public: NoPayloadAsyncRequest(void* registered_method, ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) : RegisteredAsyncRequest(server, context, stream, call_cq, tag) { @@ -169,7 +171,7 @@ class ServerInterface : public CallHook { public: PayloadAsyncRequest(void* registered_method, ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, Message* request) @@ -195,7 +197,7 @@ class ServerInterface : public CallHook { class GenericAsyncRequest : public BaseAsyncRequest { public: GenericAsyncRequest(ServerInterface* server, GenericServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize); @@ -207,8 +209,9 @@ class ServerInterface : public CallHook { }; template - void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context, - ServerAsyncStreamingInterface* stream, + void RequestAsyncCall(internal::RpcServiceMethod* method, + ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, Message* message) { @@ -218,8 +221,9 @@ class ServerInterface : public CallHook { message); } - void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context, - ServerAsyncStreamingInterface* stream, + void RequestAsyncCall(internal::RpcServiceMethod* method, + ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { GPR_CODEGEN_ASSERT(method); @@ -228,7 +232,7 @@ class ServerInterface : public CallHook { } void RequestAsyncGenericCall(GenericServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { diff --git a/include/grpc++/impl/codegen/service_type.h b/include/grpc++/impl/codegen/service_type.h index 2dc4ea0ea6..71c3d99d5c 100644 --- a/include/grpc++/impl/codegen/service_type.h +++ b/include/grpc++/impl/codegen/service_type.h @@ -28,13 +28,14 @@ namespace grpc { -class Call; class CompletionQueue; class Server; class ServerInterface; class ServerCompletionQueue; class ServerContext; +namespace internal { +class Call; class ServerAsyncStreamingInterface { public: virtual ~ServerAsyncStreamingInterface() {} @@ -48,9 +49,10 @@ class ServerAsyncStreamingInterface { virtual void SendInitialMetadata(void* tag) = 0; private: - friend class ServerInterface; + friend class ::grpc::ServerInterface; virtual void BindCall(Call* call) = 0; }; +} // namespace internal /// Desriptor of an RPC service and its various RPC methods class Service { @@ -88,40 +90,38 @@ class Service { protected: template void RequestAsyncUnary(int index, ServerContext* context, Message* request, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag, request); } - void RequestAsyncClientStreaming(int index, ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) { + void RequestAsyncClientStreaming( + int index, ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag); } template - void RequestAsyncServerStreaming(int index, ServerContext* context, - Message* request, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) { + void RequestAsyncServerStreaming( + int index, ServerContext* context, Message* request, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag, request); } - void RequestAsyncBidiStreaming(int index, ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) { + void RequestAsyncBidiStreaming( + int index, ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag); } - void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); } + void AddMethod(internal::RpcServiceMethod* method) { + methods_.emplace_back(method); + } void MarkMethodAsync(int index) { GPR_CODEGEN_ASSERT( @@ -139,7 +139,7 @@ class Service { methods_[index].reset(); } - void MarkMethodStreamed(int index, MethodHandler* streamed_method) { + void MarkMethodStreamed(int index, internal::MethodHandler* streamed_method) { GPR_CODEGEN_ASSERT(methods_[index] && methods_[index]->handler() && "Cannot mark an async or generic method Streamed"); methods_[index]->SetHandler(streamed_method); @@ -148,14 +148,14 @@ class Service { // case of BIDI_STREAMING that has 1 read and 1 write, in that order, // and split server-side streaming is BIDI_STREAMING with 1 read and // any number of writes, in that order. - methods_[index]->SetMethodType(::grpc::RpcMethod::BIDI_STREAMING); + methods_[index]->SetMethodType(internal::RpcMethod::BIDI_STREAMING); } private: friend class Server; friend class ServerInterface; ServerInterface* server_; - std::vector> methods_; + std::vector> methods_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 3fa208963d..f9c8303000 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -30,6 +30,7 @@ namespace grpc { +namespace internal { /// Common interface for all synchronous client side streaming. class ClientStreamingInterface { public: @@ -62,20 +63,6 @@ class ClientStreamingInterface { virtual Status Finish() = 0; }; -/// Common interface for all synchronous server side streaming. -class ServerStreamingInterface { - public: - virtual ~ServerStreamingInterface() {} - - /// Block to send initial metadata to client. - /// This call is optional, but if it is used, it cannot be used concurrently - /// with or after the \a Finish method. - /// - /// The initial metadata that will be sent to the client will be - /// taken from the \a ServerContext associated with the call. - virtual void SendInitialMetadata() = 0; -}; - /// An interface that yields a sequence of messages of type \a R. template class ReaderInterface { @@ -141,16 +128,55 @@ class WriterInterface { } }; +} // namespace internal + /// Client-side interface for streaming reads of message of type \a R. template -class ClientReaderInterface : public ClientStreamingInterface, - public ReaderInterface { +class ClientReaderInterface : public internal::ClientStreamingInterface, + public internal::ReaderInterface { + public: + /// Block to wait for initial metadata from server. The received metadata + /// can only be accessed after this call returns. Should only be called before + /// the first read. Calling this method is optional, and if it is not called + /// the metadata will be available in ClientContext after the first read. + virtual void WaitForInitialMetadata() = 0; +}; + +/// Client-side interface for streaming writes of message type \a W. +template +class ClientWriterInterface : public internal::ClientStreamingInterface, + public internal::WriterInterface { + public: + /// Half close writing from the client. (signal that the stream of messages + /// coming from the clinet is complete). + /// Blocks until currently-pending writes are completed. + /// Thread safe with respect to \a ReaderInterface::Read operations only + /// + /// \return Whether the writes were successful. + virtual bool WritesDone() = 0; +}; + +/// Client-side interface for bi-directional streaming with +/// client-to-server stream messages of type \a W and +/// server-to-client stream messages of type \a R. +template +class ClientReaderWriterInterface : public internal::ClientStreamingInterface, + public internal::WriterInterface, + public internal::ReaderInterface { public: /// Block to wait for initial metadata from server. The received metadata /// can only be accessed after this call returns. Should only be called before /// the first read. Calling this method is optional, and if it is not called /// the metadata will be available in ClientContext after the first read. virtual void WaitForInitialMetadata() = 0; + + /// Half close writing from the client. (signal that the stream of messages + /// coming from the clinet is complete). + /// Blocks until currently-pending writes are completed. + /// Thread-safe with respect to \a ReaderInterface::Read + /// + /// \return Whether the writes were successful. + virtual bool WritesDone() = 0; }; /// Synchronous (blocking) client-side API for doing server-streaming RPCs, @@ -159,28 +185,14 @@ class ClientReaderInterface : public ClientStreamingInterface, template class ClientReader final : public ClientReaderInterface { public: - /// Block to create a stream and write the initial metadata and \a request - /// out. Note that \a context will be used to fill in custom initial - /// metadata used to send to the server when starting the call. - template - ClientReader(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, const W& request) - : context_(context), - cq_(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, - GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq - call_(channel->CreateCall(method, context, &cq_)) { - CallOpSet - ops; - ops.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok()); - ops.ClientSendClose(); - call_.PerformOps(&ops); - cq_.Pluck(&ops); - } + struct internal { + template + static ClientReader* Create(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request) { + return new ClientReader(channel, method, context, request); + } + }; /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for /// semantics. @@ -192,7 +204,8 @@ class ClientReader final : public ClientReaderInterface { void WaitForInitialMetadata() override { GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - CallOpSet ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); cq_.Pluck(&ops); /// status ignored @@ -209,7 +222,9 @@ class ClientReader final : public ClientReaderInterface { /// already received (if initial metadata is received, it can be then /// accessed through the \a ClientContext associated with this call). bool Read(R* msg) override { - CallOpSet> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage> + ops; if (!context_->initial_metadata_received_) { ops.RecvInitialMetadata(context_); } @@ -224,7 +239,7 @@ class ClientReader final : public ClientReaderInterface { /// The \a ClientContext associated with this call is updated with /// possible metadata received from the server. Status Finish() override { - CallOpSet ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientRecvStatus> ops; Status status; ops.ClientRecvStatus(context_, &status); call_.PerformOps(&ops); @@ -235,53 +250,48 @@ class ClientReader final : public ClientReaderInterface { private: ClientContext* context_; CompletionQueue cq_; - Call call_; -}; + ::grpc::internal::Call call_; -/// Client-side interface for streaming writes of message type \a W. -template -class ClientWriterInterface : public ClientStreamingInterface, - public WriterInterface { - public: - /// Half close writing from the client. (signal that the stream of messages - /// coming from the clinet is complete). - /// Blocks until currently-pending writes are completed. - /// Thread safe with respect to \a ReaderInterface::Read operations only - /// - /// \return Whether the writes were successful. - virtual bool WritesDone() = 0; + /// Block to create a stream and write the initial metadata and \a request + /// out. Note that \a context will be used to fill in custom initial + /// metadata used to send to the server when starting the call. + template + ClientReader(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> + ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok()); + ops.ClientSendClose(); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } }; /// Synchronous (blocking) client-side API for doing client-streaming RPCs, /// where the outgoing message stream coming from the client has messages of /// type \a W. template -class ClientWriter : public ClientWriterInterface { +class ClientWriter final : public ClientWriterInterface { public: - /// Block to create a stream (i.e. send request headers and other initial - /// metadata to the server). Note that \a context will be used to fill - /// in custom initial metadata. \a response will be filled in with the - /// single expected response message from the server upon a successful - /// call to the \a Finish method of this instance. - template - ClientWriter(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, R* response) - : context_(context), - cq_(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, - GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq - call_(channel->CreateCall(method, context, &cq_)) { - finish_ops_.RecvMessage(response); - finish_ops_.AllowNoMessage(); - - if (!context_->initial_metadata_corked_) { - CallOpSet ops; - ops.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&ops); - cq_.Pluck(&ops); + struct internal { + template + static ClientWriter* Create(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, R* response) { + return new ClientWriter(channel, method, context, response); } - } + }; /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for /// semantics. @@ -292,7 +302,8 @@ class ClientWriter : public ClientWriterInterface { void WaitForInitialMetadata() { GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - CallOpSet ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); cq_.Pluck(&ops); // status ignored @@ -304,10 +315,11 @@ class ClientWriter : public ClientWriterInterface { /// Side effect: /// Also sends initial metadata if not already sent (using the /// \a ClientContext associated with this call). - using WriterInterface::Write; + using ::grpc::internal::WriterInterface::Write; bool Write(const W& msg, WriteOptions options) override { - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> ops; if (options.is_last_message()) { @@ -328,7 +340,7 @@ class ClientWriter : public ClientWriterInterface { } bool WritesDone() override { - CallOpSet ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; ops.ClientSendClose(); call_.PerformOps(&ops); return cq_.Pluck(&ops); @@ -353,61 +365,55 @@ class ClientWriter : public ClientWriterInterface { private: ClientContext* context_; - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpGenericRecvMessage, + ::grpc::internal::CallOpClientRecvStatus> finish_ops_; CompletionQueue cq_; - Call call_; -}; + ::grpc::internal::Call call_; -/// Client-side interface for bi-directional streaming with -/// client-to-server stream messages of type \a W and -/// server-to-client stream messages of type \a R. -template -class ClientReaderWriterInterface : public ClientStreamingInterface, - public WriterInterface, - public ReaderInterface { - public: - /// Block to wait for initial metadata from server. The received metadata - /// can only be accessed after this call returns. Should only be called before - /// the first read. Calling this method is optional, and if it is not called - /// the metadata will be available in ClientContext after the first read. - virtual void WaitForInitialMetadata() = 0; - - /// Half close writing from the client. (signal that the stream of messages - /// coming from the clinet is complete). - /// Blocks until currently-pending writes are completed. - /// Thread-safe with respect to \a ReaderInterface::Read - /// - /// \return Whether the writes were successful. - virtual bool WritesDone() = 0; -}; - -/// Synchronous (blocking) client-side API for bi-directional streaming RPCs, -/// where the outgoing message stream coming from the client has messages of -/// type \a W, and the incoming messages stream coming from the server has -/// messages of type \a R. -template -class ClientReaderWriter final : public ClientReaderWriterInterface { - public: - /// Block to create a stream and write the initial metadata and \a request - /// out. Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context) + /// Block to create a stream (i.e. send request headers and other initial + /// metadata to the server). Note that \a context will be used to fill + /// in custom initial metadata. \a response will be filled in with the + /// single expected response message from the server upon a successful + /// call to the \a Finish method of this instance. + template + ClientWriter(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, R* response) : context_(context), cq_(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq call_(channel->CreateCall(method, context, &cq_)) { + finish_ops_.RecvMessage(response); + finish_ops_.AllowNoMessage(); + if (!context_->initial_metadata_corked_) { - CallOpSet ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + ops; ops.SendInitialMetadata(context->send_initial_metadata_, context->initial_metadata_flags()); call_.PerformOps(&ops); cq_.Pluck(&ops); } } +}; + +/// Synchronous (blocking) client-side API for bi-directional streaming RPCs, +/// where the outgoing message stream coming from the client has messages of +/// type \a W, and the incoming messages stream coming from the server has +/// messages of type \a R. +template +class ClientReaderWriter final : public ClientReaderWriterInterface { + public: + struct internal { + static ClientReaderWriter* Create(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context) { + return new ClientReaderWriter(channel, method, context); + } + }; /// Block waiting to read initial metadata from the server. /// This call is optional, but if it is used, it cannot be used concurrently @@ -418,7 +424,8 @@ class ClientReaderWriter final : public ClientReaderWriterInterface { void WaitForInitialMetadata() override { GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - CallOpSet ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); cq_.Pluck(&ops); // status ignored @@ -434,7 +441,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface { /// Also receives initial metadata if not already received (updates the \a /// ClientContext associated with this call in that case). bool Read(R* msg) override { - CallOpSet> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage> + ops; if (!context_->initial_metadata_received_) { ops.RecvInitialMetadata(context_); } @@ -448,10 +457,11 @@ class ClientReaderWriter final : public ClientReaderWriterInterface { /// Side effect: /// Also sends initial metadata if not already sent (using the /// \a ClientContext associated with this call to fill in values). - using WriterInterface::Write; + using ::grpc::internal::WriterInterface::Write; bool Write(const W& msg, WriteOptions options) override { - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> ops; if (options.is_last_message()) { @@ -472,7 +482,7 @@ class ClientReaderWriter final : public ClientReaderWriterInterface { } bool WritesDone() override { - CallOpSet ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; ops.ClientSendClose(); call_.PerformOps(&ops); return cq_.Pluck(&ops); @@ -484,7 +494,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface { /// - the \a ClientContext associated with this call is updated with /// possible trailing metadata sent from the server. Status Finish() override { - CallOpSet ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpClientRecvStatus> + ops; if (!context_->initial_metadata_received_) { ops.RecvInitialMetadata(context_); } @@ -498,13 +510,61 @@ class ClientReaderWriter final : public ClientReaderWriterInterface { private: ClientContext* context_; CompletionQueue cq_; - Call call_; + ::grpc::internal::Call call_; + + /// Block to create a stream and write the initial metadata and \a request + /// out. Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + ClientReaderWriter(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + if (!context_->initial_metadata_corked_) { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + } }; +namespace internal { +/// Common interface for all synchronous server side streaming. +class ServerStreamingInterface { + public: + virtual ~ServerStreamingInterface() {} + + /// Block to send initial metadata to client. + /// This call is optional, but if it is used, it cannot be used concurrently + /// with or after the \a Finish method. + /// + /// The initial metadata that will be sent to the client will be + /// taken from the \a ServerContext associated with the call. + virtual void SendInitialMetadata() = 0; +}; +} // namespace internal + /// Server-side interface for streaming reads of message of type \a R. template -class ServerReaderInterface : public ServerStreamingInterface, - public ReaderInterface {}; +class ServerReaderInterface : public internal::ServerStreamingInterface, + public internal::ReaderInterface {}; + +/// Server-side interface for streaming writes of message of type \a W. +template +class ServerWriterInterface : public internal::ServerStreamingInterface, + public internal::WriterInterface {}; + +/// Server-side interface for bi-directional streaming. +template +class ServerReaderWriterInterface : public internal::ServerStreamingInterface, + public internal::WriterInterface, + public internal::ReaderInterface {}; /// Synchronous (blocking) server-side API for doing client-streaming RPCs, /// where the incoming message stream coming from the client has messages of @@ -512,15 +572,13 @@ class ServerReaderInterface : public ServerStreamingInterface, template class ServerReader final : public ServerReaderInterface { public: - ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} - /// See the \a ServerStreamingInterface.SendInitialMetadata method /// for semantics. Note that initial metadata will be affected by the /// \a ServerContext associated with this call. void SendInitialMetadata() override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - CallOpSet ops; + internal::CallOpSet ops; ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { @@ -537,21 +595,22 @@ class ServerReader final : public ServerReaderInterface { } bool Read(R* msg) override { - CallOpSet> ops; + internal::CallOpSet> ops; ops.RecvMessage(msg); call_->PerformOps(&ops); return call_->cq()->Pluck(&ops) && ops.got_message; } private: - Call* const call_; + internal::Call* const call_; ServerContext* const ctx_; -}; -/// Server-side interface for streaming writes of message of type \a W. -template -class ServerWriterInterface : public ServerStreamingInterface, - public WriterInterface {}; + template + friend class internal::ClientStreamingHandler; + + ServerReader(internal::Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} +}; /// Synchronous (blocking) server-side API for doing for doing a /// server-streaming RPCs, where the outgoing message stream coming from the @@ -559,8 +618,6 @@ class ServerWriterInterface : public ServerStreamingInterface, template class ServerWriter final : public ServerWriterInterface { public: - ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} - /// See the \a ServerStreamingInterface.SendInitialMetadata method /// for semantics. /// Note that initial metadata will be affected by the @@ -568,7 +625,7 @@ class ServerWriter final : public ServerWriterInterface { void SendInitialMetadata() override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - CallOpSet ops; + internal::CallOpSet ops; ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { @@ -584,11 +641,12 @@ class ServerWriter final : public ServerWriterInterface { /// Side effect: /// Also sends initial metadata if not already sent (using the /// \a ClientContext associated with this call to fill in values). - using WriterInterface::Write; + using internal::WriterInterface::Write; bool Write(const W& msg, WriteOptions options) override { if (options.is_last_message()) { options.set_buffer_hint(); } + if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) { return false; } @@ -613,15 +671,15 @@ class ServerWriter final : public ServerWriterInterface { } private: - Call* const call_; + internal::Call* const call_; ServerContext* const ctx_; -}; -/// Server-side interface for bi-directional streaming. -template -class ServerReaderWriterInterface : public ServerStreamingInterface, - public WriterInterface, - public ReaderInterface {}; + template + friend class internal::ServerStreamingHandler; + + ServerWriter(internal::Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} +}; /// Actual implementation of bi-directional streaming namespace internal { @@ -688,6 +746,7 @@ class ServerReaderWriterBody final { Call* const call_; ServerContext* const ctx_; }; + } // namespace internal /// Synchronous (blocking) server-side API for a bidirectional @@ -697,8 +756,6 @@ class ServerReaderWriterBody final { template class ServerReaderWriter final : public ServerReaderWriterInterface { public: - ServerReaderWriter(Call* call, ServerContext* ctx) : body_(call, ctx) {} - /// See the \a ServerStreamingInterface.SendInitialMetadata method /// for semantics. Note that initial metadata will be affected by the /// \a ServerContext associated with this call. @@ -715,13 +772,18 @@ class ServerReaderWriter final : public ServerReaderWriterInterface { /// Side effect: /// Also sends initial metadata if not already sent (using the \a /// ServerContext associated with this call). - using WriterInterface::Write; + using internal::WriterInterface::Write; bool Write(const W& msg, WriteOptions options) override { return body_.Write(msg, options); } private: internal::ServerReaderWriterBody body_; + + friend class internal::TemplatedBidiStreamingHandler, + false>; + ServerReaderWriter(internal::Call* call, ServerContext* ctx) + : body_(call, ctx) {} }; /// A class to represent a flow-controlled unary call. This is something @@ -736,9 +798,6 @@ template class ServerUnaryStreamer final : public ServerReaderWriterInterface { public: - ServerUnaryStreamer(Call* call, ServerContext* ctx) - : body_(call, ctx), read_done_(false), write_done_(false) {} - /// Block to send initial metadata to client. /// Implicit input parameter: /// - the \a ServerContext associated with this call will be used for @@ -775,7 +834,7 @@ class ServerUnaryStreamer final /// \param options The WriteOptions affecting the write operation. /// /// \return \a true on success, \a false when the stream has been closed. - using WriterInterface::Write; + using internal::WriterInterface::Write; bool Write(const ResponseType& response, WriteOptions options) override { if (write_done_ || !read_done_) { return false; @@ -788,6 +847,11 @@ class ServerUnaryStreamer final internal::ServerReaderWriterBody body_; bool read_done_; bool write_done_; + + friend class internal::TemplatedBidiStreamingHandler< + ServerUnaryStreamer, true>; + ServerUnaryStreamer(internal::Call* call, ServerContext* ctx) + : body_(call, ctx), read_done_(false), write_done_(false) {} }; /// A class to represent a flow-controlled server-side streaming call. @@ -799,9 +863,6 @@ template class ServerSplitStreamer final : public ServerReaderWriterInterface { public: - ServerSplitStreamer(Call* call, ServerContext* ctx) - : body_(call, ctx), read_done_(false) {} - /// Block to send initial metadata to client. /// Implicit input parameter: /// - the \a ServerContext associated with this call will be used for @@ -838,7 +899,7 @@ class ServerSplitStreamer final /// \param options The WriteOptions affecting the write operation. /// /// \return \a true on success, \a false when the stream has been closed. - using WriterInterface::Write; + using internal::WriterInterface::Write; bool Write(const ResponseType& response, WriteOptions options) override { return read_done_ && body_.Write(response, options); } @@ -846,6 +907,11 @@ class ServerSplitStreamer final private: internal::ServerReaderWriterBody body_; bool read_done_; + + friend class internal::TemplatedBidiStreamingHandler< + ServerSplitStreamer, false>; + ServerSplitStreamer(internal::Call* call, ServerContext* ctx) + : body_(call, ctx), read_done_(false) {} }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/time.h b/include/grpc++/impl/codegen/time.h index 589deb4f03..d464d6ea13 100644 --- a/include/grpc++/impl/codegen/time.h +++ b/include/grpc++/impl/codegen/time.h @@ -19,6 +19,8 @@ #ifndef GRPCXX_IMPL_CODEGEN_TIME_H #define GRPCXX_IMPL_CODEGEN_TIME_H +#include + #include #include @@ -59,10 +61,6 @@ class TimePoint { } // namespace grpc -#include - -#include - namespace grpc { // from and to should be absolute time. -- cgit v1.2.3