diff options
author | Vijay Pai <vpai@google.com> | 2017-06-21 15:45:05 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2017-07-10 14:33:53 -0700 |
commit | c0baec60a1ca380425c5514d1df584a63b646305 (patch) | |
tree | 064d6aa900b778640e4ce6493fb9e596c1e3ebad /include | |
parent | 6c1418e1dd7f8fe770531bcc9ac821fbf1ae5f1b (diff) |
Internalize structs and methods meant for being exposed through codegen
or that interface with core and are only for internal use
Diffstat (limited to 'include')
22 files changed, 681 insertions, 509 deletions
diff --git a/include/grpc++/alarm.h b/include/grpc++/alarm.h index ed8dacbc94..00a9306d9d 100644 --- a/include/grpc++/alarm.h +++ b/include/grpc++/alarm.h @@ -77,7 +77,7 @@ class Alarm : private GrpcLibraryCodegen { void Cancel() { grpc_alarm_cancel(alarm_); } private: - class AlarmEntry : public CompletionQueueTag { + class AlarmEntry : public internal::CompletionQueueTag { public: AlarmEntry(void* tag) : tag_(tag) {} bool FinalizeResult(void** tag, bool* status) override { diff --git a/include/grpc++/channel.h b/include/grpc++/channel.h index c50091d6ac..5ba3c591f0 100644 --- a/include/grpc++/channel.h +++ b/include/grpc++/channel.h @@ -32,7 +32,7 @@ struct grpc_channel; namespace grpc { /// Channels represent a connection to an endpoint. Created by \a CreateChannel. class Channel final : public ChannelInterface, - public CallHook, + public internal::CallHook, public std::enable_shared_from_this<Channel>, private GrpcLibraryCodegen { public: @@ -52,7 +52,7 @@ class Channel final : public ChannelInterface, private: template <class InputMessage, class OutputMessage> friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, + const internal::RpcMethod& method, ClientContext* context, const InputMessage& request, OutputMessage* result); @@ -60,9 +60,11 @@ class Channel final : public ChannelInterface, const grpc::string& host, grpc_channel* c_channel); Channel(const grpc::string& host, grpc_channel* c_channel); - Call CreateCall(const RpcMethod& method, ClientContext* context, - CompletionQueue* cq) override; - void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) override; + internal::Call CreateCall(const internal::RpcMethod& method, + ClientContext* context, + CompletionQueue* cq) override; + void PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) override; void* RegisterMethod(const char* method) override; void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index 9cf7ac30dd..ddbf3e655e 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -30,6 +30,7 @@ namespace grpc { class CompletionQueue; +namespace internal { /// Common interface for all client side asynchronous streaming. class ClientAsyncStreamingInterface { public: @@ -146,9 +147,41 @@ class AsyncWriterInterface { } }; +} // namespace internal + template <class R> -class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface, - public AsyncReaderInterface<R> {}; +class ClientAsyncReaderInterface + : public internal::ClientAsyncStreamingInterface, + public internal::AsyncReaderInterface<R> {}; + +/// Common interface for client side asynchronous writing. +template <class W> +class ClientAsyncWriterInterface + : public internal::ClientAsyncStreamingInterface, + public internal::AsyncWriterInterface<W> { + public: + /// Signal the client is done with the writes (half-close the client stream). + /// Thread-safe with respect to \a AsyncReaderInterface::Read + /// + /// \param[in] tag The tag identifying the operation. + virtual void WritesDone(void* tag) = 0; +}; + +/// Async client-side interface for bi-directional streaming, +/// where the client-to-server message stream has messages of type \a W, +/// and the server-to-client message stream has messages of type \a R. +template <class W, class R> +class ClientAsyncReaderWriterInterface + : public internal::ClientAsyncStreamingInterface, + public internal::AsyncWriterInterface<W>, + public internal::AsyncReaderInterface<R> { + public: + /// Signal the client is done with the writes (half-close the client stream). + /// Thread-safe with respect to \a AsyncReaderInterface::Read + /// + /// \param[in] tag The tag identifying the operation. + virtual void WritesDone(void* tag) = 0; +}; /// Async client-side API for doing server-streaming RPCs, /// where the incoming message stream coming from the server has @@ -156,21 +189,24 @@ class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface, template <class R> class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { public: - /// Create a stream and write the first request out. - /// \a tag will be notified on \a cq when the call has been started and - /// \a request has been written out. - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - template <class W> - static ClientAsyncReader* Create(ChannelInterface* channel, - CompletionQueue* cq, const RpcMethod& method, - ClientContext* context, const W& request, - void* tag) { - Call call = channel->CreateCall(method, context, cq); - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncReader))) - ClientAsyncReader(call, context, request, tag); - } + struct internal { + /// Create a stream and write the first request out. + /// \a tag will be notified on \a cq when the call has been started and + /// \a request has been written out. + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + template <class W> + static ClientAsyncReader* Create(::grpc::ChannelInterface* channel, + CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request, + void* tag) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncReader))) + ClientAsyncReader(call, context, request, tag); + } + }; // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -218,8 +254,8 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { private: template <class W> - ClientAsyncReader(Call call, ClientContext* context, const W& request, - void* tag) + ClientAsyncReader(::grpc::internal::Call call, ClientContext* context, + const W& request, void* tag) : context_(context), call_(call) { init_ops_.set_output_tag(tag); init_ops_.SendInitialMetadata(context->send_initial_metadata_, @@ -231,24 +267,19 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { } ClientContext* context_; - Call call_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> + ::grpc::internal::Call call_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; -}; - -/// Common interface for client side asynchronous writing. -template <class W> -class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, - public AsyncWriterInterface<W> { - public: - /// Signal the client is done with the writes (half-close the client stream). - /// Thread-safe with respect to \a AsyncReaderInterface::Read - /// - /// \param[in] tag The tag identifying the operation. - virtual void WritesDone(void* tag) = 0; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage<R>> + read_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpClientRecvStatus> + finish_ops_; }; /// Async API on the client side for doing client-streaming RPCs, @@ -257,24 +288,27 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, template <class W> class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { public: - /// Create a stream and write the first request out. - /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent) and \a request has been written out. - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - /// \a response will be filled in with the single expected response - /// message from the server upon a successful call to the \a Finish - /// method of this instance. - template <class R> - static ClientAsyncWriter* Create(ChannelInterface* channel, - CompletionQueue* cq, const RpcMethod& method, - ClientContext* context, R* response, - void* tag) { - Call call = channel->CreateCall(method, context, cq); - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncWriter))) - ClientAsyncWriter(call, context, response, tag); - } + struct internal { + /// Create a stream and write the first request out. + /// \a tag will be notified on \a cq when the call has been started (i.e. + /// intitial metadata sent) and \a request has been written out. + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + /// \a response will be filled in with the single expected response + /// message from the server upon a successful call to the \a Finish + /// method of this instance. + template <class R> + static ClientAsyncWriter* Create(::grpc::ChannelInterface* channel, + CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, R* response, + void* tag) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncWriter))) + ClientAsyncWriter(call, context, response, tag); + } + }; // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -338,7 +372,8 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { private: template <class R> - ClientAsyncWriter(Call call, ClientContext* context, R* response, void* tag) + ClientAsyncWriter(::grpc::internal::Call call, ClientContext* context, + R* response, void* tag) : context_(context), call_(call) { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); @@ -356,31 +391,20 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { } ClientContext* context_; - Call call_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> + ::grpc::internal::Call call_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> write_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, - CallOpClientRecvStatus> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpGenericRecvMessage, + ::grpc::internal::CallOpClientRecvStatus> finish_ops_; }; /// Async client-side interface for bi-directional streaming, -/// where the client-to-server message stream has messages of type \a W, -/// and the server-to-client message stream has messages of type \a R. -template <class W, class R> -class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface, - public AsyncWriterInterface<W>, - public AsyncReaderInterface<R> { - public: - /// Signal the client is done with the writes (half-close the client stream). - /// Thread-safe with respect to \a AsyncReaderInterface::Read - /// - /// \param[in] tag The tag identifying the operation. - virtual void WritesDone(void* tag) = 0; -}; - -/// Async client-side interface for bi-directional streaming, /// where the outgoing message stream going to the server /// has messages of type \a W, and the incoming message stream coming /// from the server has messages of type \a R. @@ -388,21 +412,23 @@ template <class W, class R> class ClientAsyncReaderWriter final : public ClientAsyncReaderWriterInterface<W, R> { public: - /// Create a stream and write the first request out. - /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent). - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - static ClientAsyncReaderWriter* Create(ChannelInterface* channel, - CompletionQueue* cq, - const RpcMethod& method, - ClientContext* context, void* tag) { - Call call = channel->CreateCall(method, context, cq); - - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncReaderWriter))) - ClientAsyncReaderWriter(call, context, tag); - } + struct internal { + /// Create a stream and write the first request out. + /// \a tag will be notified on \a cq when the call has been started (i.e. + /// intitial metadata sent). + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + static ClientAsyncReaderWriter* Create( + ::grpc::ChannelInterface* channel, CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, ClientContext* context, + void* tag) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncReaderWriter))) + ClientAsyncReaderWriter(call, context, tag); + } + }; // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -471,7 +497,8 @@ class ClientAsyncReaderWriter final } private: - ClientAsyncReaderWriter(Call call, ClientContext* context, void* tag) + ClientAsyncReaderWriter(::grpc::internal::Call call, ClientContext* context, + void* tag) : context_(context), call_(call) { if (context_->initial_metadata_corked_) { // if corked bit is set in context, we buffer up the initial metadata to @@ -487,17 +514,25 @@ class ClientAsyncReaderWriter final } ClientContext* context_; - Call call_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> + ::grpc::internal::Call call_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage<R>> + read_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> write_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpClientRecvStatus> + finish_ops_; }; template <class W, class R> -class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface, - public AsyncReaderInterface<R> { +class ServerAsyncReaderInterface + : public internal::ServerAsyncStreamingInterface, + public internal::AsyncReaderInterface<R> { public: /// Indicate that the stream is to be finished with a certain status code /// and also send out \a msg response to the client. @@ -541,6 +576,89 @@ class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface, virtual void FinishWithError(const Status& status, void* tag) = 0; }; +template <class W> +class ServerAsyncWriterInterface + : public internal::ServerAsyncStreamingInterface, + public internal::AsyncWriterInterface<W> { + public: + /// Indicate that the stream is to be finished with a certain status code. + /// Request notification for when the server has sent the appropriate + /// signals to the client to end the call. + /// Should not be used concurrently with other operations. + /// + /// It is appropriate to call this method when either: + /// * all messages from the client have been received (either known + /// implictly, or explicitly because a previous \a + /// AsyncReaderInterface::Read operation with a non-ok + /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'. + /// * it is desired to end the call early with some non-OK status code. + /// + /// This operation will end when the server has finished sending out initial + /// metadata (if not sent already), response message, and status, or if + /// some failure occurred when trying to do so. + /// + /// \param[in] tag Tag identifying this request. + /// \param[in] status To be sent to the client as the result of this call. + virtual void Finish(const Status& status, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with trailing metadata which + /// contains \a status, using WriteOptions options with + /// identifying tag \a tag. + /// + /// WriteAndFinish is equivalent of performing WriteLast and Finish + /// in a single step. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] status The Status that server returns to client. + /// \param[in] tag The tag identifying the operation. + virtual void WriteAndFinish(const W& msg, WriteOptions options, + const Status& status, void* tag) = 0; +}; + +/// Server-side interface for asynchronous bi-directional streaming. +template <class W, class R> +class ServerAsyncReaderWriterInterface + : public internal::ServerAsyncStreamingInterface, + public internal::AsyncWriterInterface<W>, + public internal::AsyncReaderInterface<R> { + public: + /// Indicate that the stream is to be finished with a certain status code. + /// Request notification for when the server has sent the appropriate + /// signals to the client to end the call. + /// Should not be used concurrently with other operations. + /// + /// It is appropriate to call this method when either: + /// * all messages from the client have been received (either known + /// implictly, or explicitly because a previous \a + /// AsyncReaderInterface::Read operation + /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' + /// with 'false'. + /// * it is desired to end the call early with some non-OK status code. + /// + /// This operation will end when the server has finished sending out initial + /// metadata (if not sent already), response message, and status, or if some + /// failure occurred when trying to do so. + /// + /// \param[in] tag Tag identifying this request. + /// \param[in] status To be sent to the client as the result of this call. + virtual void Finish(const Status& status, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with trailing metadata which + /// contains \a status, using WriteOptions options with + /// identifying tag \a tag. + /// + /// WriteAndFinish is equivalent of performing WriteLast and Finish in a + /// single step. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] status The Status that server returns to client. + /// \param[in] tag The tag identifying the operation. + virtual void WriteAndFinish(const W& msg, WriteOptions options, + const Status& status, void* tag) = 0; +}; + /// Async server-side API for doing client-streaming RPCs, /// where the incoming message stream from the client has messages of type \a R, /// and the single response message sent from the server is type \a W. @@ -624,56 +742,19 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> { } private: - void BindCall(Call* call) override { call_ = *call; } + void BindCall(::grpc::internal::Call* call) override { call_ = *call; } - Call call_; + ::grpc::internal::Call call_; ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> finish_ops_; }; -template <class W> -class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface, - public AsyncWriterInterface<W> { - public: - /// Indicate that the stream is to be finished with a certain status code. - /// Request notification for when the server has sent the appropriate - /// signals to the client to end the call. - /// Should not be used concurrently with other operations. - /// - /// It is appropriate to call this method when either: - /// * all messages from the client have been received (either known - /// implictly, or explicitly because a previous \a - /// AsyncReaderInterface::Read operation with a non-ok - /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'. - /// * it is desired to end the call early with some non-OK status code. - /// - /// This operation will end when the server has finished sending out initial - /// metadata (if not sent already), response message, and status, or if - /// some failure occurred when trying to do so. - /// - /// \param[in] tag Tag identifying this request. - /// \param[in] status To be sent to the client as the result of this call. - virtual void Finish(const Status& status, void* tag) = 0; - - /// Request the writing of \a msg and coalesce it with trailing metadata which - /// contains \a status, using WriteOptions options with - /// identifying tag \a tag. - /// - /// WriteAndFinish is equivalent of performing WriteLast and Finish - /// in a single step. - /// - /// \param[in] msg The message to be written. - /// \param[in] options The WriteOptions to be used to write this message. - /// \param[in] status The Status that server returns to client. - /// \param[in] tag The tag identifying the operation. - virtual void WriteAndFinish(const W& msg, WriteOptions options, - const Status& status, void* tag) = 0; -}; - /// Async server-side API for doing server streaming RPCs, /// where the outgoing message stream from the server has messages of type \a W. template <class W> @@ -755,7 +836,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { } private: - void BindCall(Call* call) override { call_ = *call; } + void BindCall(::grpc::internal::Call* call) override { call_ = *call; } template <class T> void EnsureInitialMetadataSent(T* ops) { @@ -769,55 +850,17 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { } } - Call call_; + ::grpc::internal::Call call_; ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> write_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; -}; - -/// Server-side interface for asynchronous bi-directional streaming. -template <class W, class R> -class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface, - public AsyncWriterInterface<W>, - public AsyncReaderInterface<R> { - public: - /// Indicate that the stream is to be finished with a certain status code. - /// Request notification for when the server has sent the appropriate - /// signals to the client to end the call. - /// Should not be used concurrently with other operations. - /// - /// It is appropriate to call this method when either: - /// * all messages from the client have been received (either known - /// implictly, or explicitly because a previous \a - /// AsyncReaderInterface::Read operation - /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' - /// with 'false'. - /// * it is desired to end the call early with some non-OK status code. - /// - /// This operation will end when the server has finished sending out initial - /// metadata (if not sent already), response message, and status, or if some - /// failure occurred when trying to do so. - /// - /// \param[in] tag Tag identifying this request. - /// \param[in] status To be sent to the client as the result of this call. - virtual void Finish(const Status& status, void* tag) = 0; - - /// Request the writing of \a msg and coalesce it with trailing metadata which - /// contains \a status, using WriteOptions options with - /// identifying tag \a tag. - /// - /// WriteAndFinish is equivalent of performing WriteLast and Finish in a - /// single step. - /// - /// \param[in] msg The message to be written. - /// \param[in] options The WriteOptions to be used to write this message. - /// \param[in] status The Status that server returns to client. - /// \param[in] tag The tag identifying the operation. - virtual void WriteAndFinish(const W& msg, WriteOptions options, - const Status& status, void* tag) = 0; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpServerSendStatus> + finish_ops_; }; /// Async server-side API for doing bidirectional streaming RPCs, @@ -912,7 +955,7 @@ class ServerAsyncReaderWriter final private: friend class ::grpc::Server; - void BindCall(Call* call) override { call_ = *call; } + void BindCall(::grpc::internal::Call* call) override { call_ = *call; } template <class T> void EnsureInitialMetadataSent(T* ops) { @@ -926,14 +969,18 @@ class ServerAsyncReaderWriter final } } - Call call_; + ::grpc::internal::Call call_; ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> write_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpServerSendStatus> + finish_ops_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index 41b3ae3f28..45a8e8ee6a 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -75,17 +75,18 @@ class ClientAsyncResponseReader final /// intitial metadata sent) and \a request has been written out. /// Note that \a context will be used to fill in custom initial metadata /// used to send to the server when starting the call. - template <class W> - static ClientAsyncResponseReader* Create(ChannelInterface* channel, - CompletionQueue* cq, - const RpcMethod& method, - ClientContext* context, - const W& request) { - Call call = channel->CreateCall(method, context, cq); - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncResponseReader))) - ClientAsyncResponseReader(call, context, request); - } + struct internal { + template <class W> + static ClientAsyncResponseReader* Create( + ::grpc::ChannelInterface* channel, CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, ClientContext* context, + const W& request) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncResponseReader))) + ClientAsyncResponseReader(call, context, request); + } + }; /// TODO(vjpai): Delete the below constructor /// PLEASE DO NOT USE THIS CONSTRUCTOR IN NEW CODE @@ -94,9 +95,10 @@ class ClientAsyncResponseReader final /// created this struct rather than properly using a stub. /// This code will not remain a valid public constructor for long. template <class W> - ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - const W& request) + ClientAsyncResponseReader(::grpc::ChannelInterface* channel, + CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request) : context_(context), call_(channel->CreateCall(method, context, cq)), collection_(std::make_shared<Ops>()) { @@ -164,10 +166,11 @@ class ClientAsyncResponseReader final private: ClientContext* const context_; - Call call_; + ::grpc::internal::Call call_; template <class W> - ClientAsyncResponseReader(Call call, ClientContext* context, const W& request) + ClientAsyncResponseReader(::grpc::internal::Call call, ClientContext* context, + const W& request) : context_(context), call_(call) { ops_.init_buf.SendInitialMetadata(context->send_initial_metadata_, context->initial_metadata_flags()); @@ -183,13 +186,17 @@ class ClientAsyncResponseReader final // TODO(vjpai): Remove the reference to CallOpSetCollectionInterface // as soon as the related workaround (public constructor) is deleted - struct Ops : public CallOpSetCollectionInterface { - SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpClientSendClose> + struct Ops : public ::grpc::internal::CallOpSetCollectionInterface { + ::grpc::internal::SneakyCallOpSet< + ::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> init_buf; - CallOpSet<CallOpRecvInitialMetadata> meta_buf; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>, - CallOpClientRecvStatus> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + meta_buf; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage<R>, + ::grpc::internal::CallOpClientRecvStatus> finish_buf; } ops_; @@ -201,7 +208,8 @@ class ClientAsyncResponseReader final /// Async server-side API for handling unary calls, where the single /// response message sent to the client is of type \a W. template <class W> -class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { +class ServerAsyncResponseWriter final + : public internal::ServerAsyncStreamingInterface { public: explicit ServerAsyncResponseWriter(ServerContext* ctx) : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} @@ -289,13 +297,15 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { } private: - void BindCall(Call* call) override { call_ = *call; } + void BindCall(::grpc::internal::Call* call) override { call_ = *call; } - Call call_; + ::grpc::internal::Call call_; ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_buf_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_buf_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> finish_buf_; }; diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 342ea46203..32bd2ad5c7 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -44,11 +44,13 @@ struct grpc_byte_buffer; namespace grpc { class ByteBuffer; -class Call; -class CallHook; class CompletionQueue; extern CoreCodegenInterface* g_core_codegen_interface; +namespace internal { +class Call; +class CallHook; + const char kBinaryErrorDetailsKey[] = "grpc-status-details-bin"; // TODO(yangg) if the map is changed before we send, the pointers will be a @@ -76,6 +78,7 @@ inline grpc_metadata* FillMetadataArray( } return metadata_array; } +} // namespace internal /// Per-message write options. class WriteOptions { @@ -191,6 +194,7 @@ class WriteOptions { bool last_message_; }; +namespace internal { /// Default argument for CallOpSet. I is unused by the class, but can be /// used for generating multiple names for the same thing. template <int I> @@ -673,7 +677,7 @@ class Call final { grpc_call* call_; int max_receive_message_size_; }; - +} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_CALL_H diff --git a/include/grpc++/impl/codegen/call_hook.h b/include/grpc++/impl/codegen/call_hook.h index d026cc8b58..44e9de220e 100644 --- a/include/grpc++/impl/codegen/call_hook.h +++ b/include/grpc++/impl/codegen/call_hook.h @@ -21,6 +21,7 @@ namespace grpc { +namespace internal { class CallOpSetInterface; class Call; @@ -31,6 +32,7 @@ class CallHook { virtual ~CallHook() {} virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; }; +} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/channel_interface.h b/include/grpc++/impl/codegen/channel_interface.h index 1b7590bf0c..cf1d77e905 100644 --- a/include/grpc++/impl/codegen/channel_interface.h +++ b/include/grpc++/impl/codegen/channel_interface.h @@ -24,10 +24,8 @@ #include <grpc/impl/codegen/connectivity_state.h> namespace grpc { -class Call; +class ChannelInterface; class ClientContext; -class RpcMethod; -class CallOpSetInterface; class CompletionQueue; template <class R> @@ -45,6 +43,16 @@ class ClientAsyncReaderWriter; template <class R> class ClientAsyncResponseReader; +namespace internal { +class Call; +class CallOpSetInterface; +class RpcMethod; +template <class InputMessage, class OutputMessage> +Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result); +} // namespace internal + /// Codegen interface for \a grpc::Channel. class ChannelInterface { public: @@ -96,15 +104,16 @@ class ChannelInterface { template <class R> friend class ::grpc::ClientAsyncResponseReader; template <class InputMessage, class OutputMessage> - friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, - ClientContext* context, - const InputMessage& request, - OutputMessage* result); - friend class ::grpc::RpcMethod; - virtual Call CreateCall(const RpcMethod& method, ClientContext* context, - CompletionQueue* cq) = 0; - virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; + friend Status(::grpc::internal::BlockingUnaryCall)( + ChannelInterface* channel, const internal::RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result); + friend class ::grpc::internal::RpcMethod; + virtual internal::Call CreateCall(const internal::RpcMethod& method, + ClientContext* context, + CompletionQueue* cq) = 0; + virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) = 0; virtual void* RegisterMethod(const char* method) = 0; virtual void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline, @@ -112,7 +121,6 @@ class ChannelInterface { virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline) = 0; }; - } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_CHANNEL_INTERFACE_H diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h index c2a44e41ce..22d0069afa 100644 --- a/include/grpc++/impl/codegen/client_context.h +++ b/include/grpc++/impl/codegen/client_context.h @@ -60,7 +60,18 @@ class Channel; class ChannelInterface; class CompletionQueue; class CallCredentials; +class ClientContext; + +namespace internal { class RpcMethod; +class CallOpClientRecvStatus; +class CallOpRecvInitialMetadata; +template <class InputMessage, class OutputMessage> +Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result); +} // namespace internal + template <class R> class ClientReader; template <class W> @@ -345,8 +356,8 @@ class ClientContext { ClientContext& operator=(const ClientContext&); friend class ::grpc::testing::InteropClientContextInspector; - friend class CallOpClientRecvStatus; - friend class CallOpRecvInitialMetadata; + friend class ::grpc::internal::CallOpClientRecvStatus; + friend class ::grpc::internal::CallOpRecvInitialMetadata; friend class Channel; template <class R> friend class ::grpc::ClientReader; @@ -363,11 +374,10 @@ class ClientContext { template <class R> friend class ::grpc::ClientAsyncResponseReader; template <class InputMessage, class OutputMessage> - friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, - ClientContext* context, - const InputMessage& request, - OutputMessage* result); + friend Status(::grpc::internal::BlockingUnaryCall)( + ChannelInterface* channel, const internal::RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result); grpc_call* call() const { return call_; } void set_call(grpc_call* call, const std::shared_ptr<Channel>& channel); @@ -399,8 +409,8 @@ class ClientContext { mutable std::shared_ptr<const AuthContext> auth_context_; struct census_context* census_context_; std::multimap<grpc::string, grpc::string> send_initial_metadata_; - MetadataMap recv_initial_metadata_; - MetadataMap trailing_metadata_; + internal::MetadataMap recv_initial_metadata_; + internal::MetadataMap trailing_metadata_; grpc_call* propagate_from_call_; PropagationOptions propagation_options_; diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h index 7c540fade9..8fef3ab353 100644 --- a/include/grpc++/impl/codegen/client_unary_call.h +++ b/include/grpc++/impl/codegen/client_unary_call.h @@ -30,8 +30,9 @@ namespace grpc { class Channel; class ClientContext; class CompletionQueue; -class RpcMethod; +namespace internal { +class RpcMethod; /// Wrapper that performs a blocking unary call template <class InputMessage, class OutputMessage> Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, @@ -67,6 +68,7 @@ Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, return status; } +} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_CLIENT_UNARY_CALL_H diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index ca757e2a9c..a04778aa72 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -56,7 +56,19 @@ class ServerWriter; namespace internal { template <class W, class R> class ServerReaderWriterBody; -} +} // namespace internal + +class Channel; +class ChannelInterface; +class ClientContext; +class CompletionQueue; +class Server; +class ServerBuilder; +class ServerContext; + +namespace internal { +class CompletionQueueTag; +class RpcMethod; template <class ServiceType, class RequestType, class ResponseType> class RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> @@ -66,16 +78,13 @@ class ServerStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> class BidiStreamingHandler; class UnknownMethodHandler; - -class Channel; -class ChannelInterface; -class ClientContext; -class CompletionQueueTag; -class CompletionQueue; -class RpcMethod; -class Server; -class ServerBuilder; -class ServerContext; +template <class Streamer, bool WriteNeeded> +class TemplatedBidiStreamingHandler; +template <class InputMessage, class OutputMessage> +Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result); +} // namespace internal extern CoreCodegenInterface* g_core_codegen_interface; @@ -196,28 +205,27 @@ class CompletionQueue : private GrpcLibraryCodegen { template <class W, class R> friend class ::grpc::internal::ServerReaderWriterBody; template <class ServiceType, class RequestType, class ResponseType> - friend class RpcMethodHandler; + friend class ::grpc::internal::RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> - friend class ClientStreamingHandler; + friend class ::grpc::internal::ClientStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> - friend class ServerStreamingHandler; + friend class ::grpc::internal::ServerStreamingHandler; template <class Streamer, bool WriteNeeded> - friend class TemplatedBidiStreamingHandler; - friend class UnknownMethodHandler; + friend class ::grpc::internal::TemplatedBidiStreamingHandler; + friend class ::grpc::internal::UnknownMethodHandler; friend class ::grpc::Server; friend class ::grpc::ServerContext; template <class InputMessage, class OutputMessage> - friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, - ClientContext* context, - const InputMessage& request, - OutputMessage* result); + friend Status(::grpc::internal::BlockingUnaryCall)( + ChannelInterface* channel, const internal::RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result); NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); /// Wraps \a grpc_completion_queue_pluck. /// \warning Must not be mixed with calls to \a Next. - bool Pluck(CompletionQueueTag* tag) { + bool Pluck(internal::CompletionQueueTag* tag) { auto deadline = g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME); auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( @@ -238,7 +246,7 @@ class CompletionQueue : private GrpcLibraryCodegen { /// implementation to simple call the other TryPluck function with a zero /// timeout. i.e: /// TryPluck(tag, gpr_time_0(GPR_CLOCK_REALTIME)) - void TryPluck(CompletionQueueTag* tag) { + void TryPluck(internal::CompletionQueueTag* tag) { auto deadline = g_core_codegen_interface->gpr_time_0(GPR_CLOCK_REALTIME); auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( cq_, tag, deadline, nullptr); @@ -254,7 +262,7 @@ class CompletionQueue : private GrpcLibraryCodegen { /// /// This exects tag->FinalizeResult (if called) to return 'false' i.e expects /// that the tag is internal not something that is returned to the user. - void TryPluck(CompletionQueueTag* tag, gpr_timespec deadline) { + void TryPluck(internal::CompletionQueueTag* tag, gpr_timespec deadline) { auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( cq_, tag, deadline, nullptr); if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) { diff --git a/include/grpc++/impl/codegen/completion_queue_tag.h b/include/grpc++/impl/codegen/completion_queue_tag.h index 4d7d3a98dd..cb16bcf9ff 100644 --- a/include/grpc++/impl/codegen/completion_queue_tag.h +++ b/include/grpc++/impl/codegen/completion_queue_tag.h @@ -21,6 +21,7 @@ namespace grpc { +namespace internal { /// An interface allowing implementors to process and filter event tags. class CompletionQueueTag { public: @@ -31,6 +32,7 @@ class CompletionQueueTag { /// queue virtual bool FinalizeResult(void** tag, bool* status) = 0; }; +} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/metadata_map.h b/include/grpc++/impl/codegen/metadata_map.h index b73985967d..fd4750efdd 100644 --- a/include/grpc++/impl/codegen/metadata_map.h +++ b/include/grpc++/impl/codegen/metadata_map.h @@ -23,6 +23,7 @@ namespace grpc { +namespace internal { class MetadataMap { public: MetadataMap() { memset(&arr_, 0, sizeof(arr_)); } @@ -50,6 +51,7 @@ class MetadataMap { grpc_metadata_array arr_; std::multimap<grpc::string_ref, grpc::string_ref> map_; }; +} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 15e24bdcdc..87e9e5e952 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -25,6 +25,7 @@ namespace grpc { +namespace internal { /// A wrapper class of an application provided rpc method handler. template <class ServiceType, class RequestType, class ResponseType> class RpcMethodHandler : public MethodHandler { @@ -265,6 +266,7 @@ class UnknownMethodHandler : public MethodHandler { } }; +} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H diff --git a/include/grpc++/impl/codegen/rpc_method.h b/include/grpc++/impl/codegen/rpc_method.h index ac13ac56c7..54e52364ef 100644 --- a/include/grpc++/impl/codegen/rpc_method.h +++ b/include/grpc++/impl/codegen/rpc_method.h @@ -24,7 +24,7 @@ #include <grpc++/impl/codegen/channel_interface.h> namespace grpc { - +namespace internal { /// Descriptor of an RPC method class RpcMethod { public: @@ -55,6 +55,7 @@ class RpcMethod { void* const channel_tag_; }; +} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_RPC_METHOD_H diff --git a/include/grpc++/impl/codegen/rpc_service_method.h b/include/grpc++/impl/codegen/rpc_service_method.h index 7165774172..635e40469b 100644 --- a/include/grpc++/impl/codegen/rpc_service_method.h +++ b/include/grpc++/impl/codegen/rpc_service_method.h @@ -35,8 +35,8 @@ struct grpc_byte_buffer; namespace grpc { class ServerContext; -class StreamContextInterface; +namespace internal { /// Base class for running an RPC handler. class MethodHandler { public: @@ -71,6 +71,7 @@ class RpcServiceMethod : public RpcMethod { void* server_tag_; std::unique_ptr<MethodHandler> handler_; }; +} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index b5e37fd12b..a2d6967bf8 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -55,7 +55,6 @@ class ServerWriter; namespace internal { template <class W, class R> class ServerReaderWriterBody; -} template <class ServiceType, class RequestType, class ResponseType> class RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> @@ -65,9 +64,11 @@ class ServerStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> class BidiStreamingHandler; class UnknownMethodHandler; - +template <class Streamer, bool WriteNeeded> +class TemplatedBidiStreamingHandler; class Call; -class CallOpBuffer; +} // namespace internal + class CompletionQueue; class Server; class ServerInterface; @@ -247,14 +248,14 @@ class ServerContext { template <class W, class R> friend class ::grpc::internal::ServerReaderWriterBody; template <class ServiceType, class RequestType, class ResponseType> - friend class RpcMethodHandler; + friend class ::grpc::internal::RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> - friend class ClientStreamingHandler; + friend class ::grpc::internal::ClientStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> - friend class ServerStreamingHandler; + friend class ::grpc::internal::ServerStreamingHandler; template <class Streamer, bool WriteNeeded> - friend class TemplatedBidiStreamingHandler; - friend class UnknownMethodHandler; + friend class ::grpc::internal::TemplatedBidiStreamingHandler; + friend class ::grpc::internal::UnknownMethodHandler; friend class ::grpc::ClientContext; /// Prevent copying. @@ -263,9 +264,9 @@ class ServerContext { class CompletionOp; - void BeginCompletionOp(Call* call); + void BeginCompletionOp(internal::Call* call); /// Return the tag queued by BeginCompletionOp() - CompletionQueueTag* GetCompletionOpTag(); + internal::CompletionQueueTag* GetCompletionOpTag(); ServerContext(gpr_timespec deadline, grpc_metadata_array* arr); @@ -282,7 +283,7 @@ class ServerContext { CompletionQueue* cq_; bool sent_initial_metadata_; mutable std::shared_ptr<const AuthContext> auth_context_; - MetadataMap client_metadata_; + internal::MetadataMap client_metadata_; std::multimap<grpc::string, grpc::string> initial_metadata_; std::multimap<grpc::string, grpc::string> trailing_metadata_; @@ -290,7 +291,9 @@ class ServerContext { grpc_compression_level compression_level_; grpc_compression_algorithm compression_algorithm_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> pending_ops_; + internal::CallOpSet<internal::CallOpSendInitialMetadata, + internal::CallOpSendMessage> + pending_ops_; bool has_pending_ops_; }; diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h index 87bd085a37..9d120031ca 100644 --- a/include/grpc++/impl/codegen/server_interface.h +++ b/include/grpc++/impl/codegen/server_interface.h @@ -29,20 +29,21 @@ namespace grpc { class AsyncGenericService; class GenericServerContext; -class RpcService; -class ServerAsyncStreamingInterface; class ServerCompletionQueue; class ServerContext; class ServerCredentials; class Service; -class ThreadPoolInterface; extern CoreCodegenInterface* g_core_codegen_interface; /// Models a gRPC server. /// /// Servers are configured and started via \a grpc::ServerBuilder. -class ServerInterface : public CallHook { +namespace internal { +class ServerAsyncStreamingInterface; +} // namespace internal + +class ServerInterface : public internal::CallHook { public: virtual ~ServerInterface() {} @@ -77,7 +78,7 @@ class ServerInterface : public CallHook { virtual void Wait() = 0; protected: - friend class Service; + friend class ::grpc::Service; /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the Server instance. @@ -115,12 +116,13 @@ class ServerInterface : public CallHook { virtual grpc_server* server() = 0; - virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; + virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) = 0; - class BaseAsyncRequest : public CompletionQueueTag { + class BaseAsyncRequest : public internal::CompletionQueueTag { public: BaseAsyncRequest(ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, bool delete_on_finalize); virtual ~BaseAsyncRequest(); @@ -130,7 +132,7 @@ class ServerInterface : public CallHook { protected: ServerInterface* const server_; ServerContext* const context_; - ServerAsyncStreamingInterface* const stream_; + internal::ServerAsyncStreamingInterface* const stream_; CompletionQueue* const call_cq_; void* const tag_; const bool delete_on_finalize_; @@ -140,7 +142,7 @@ class ServerInterface : public CallHook { class RegisteredAsyncRequest : public BaseAsyncRequest { public: RegisteredAsyncRequest(ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag); // uses BaseAsyncRequest::FinalizeResult @@ -154,7 +156,7 @@ class ServerInterface : public CallHook { public: NoPayloadAsyncRequest(void* registered_method, ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) : RegisteredAsyncRequest(server, context, stream, call_cq, tag) { @@ -169,7 +171,7 @@ class ServerInterface : public CallHook { public: PayloadAsyncRequest(void* registered_method, ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, Message* request) @@ -195,7 +197,7 @@ class ServerInterface : public CallHook { class GenericAsyncRequest : public BaseAsyncRequest { public: GenericAsyncRequest(ServerInterface* server, GenericServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize); @@ -207,8 +209,9 @@ class ServerInterface : public CallHook { }; template <class Message> - void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context, - ServerAsyncStreamingInterface* stream, + void RequestAsyncCall(internal::RpcServiceMethod* method, + ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, Message* message) { @@ -218,8 +221,9 @@ class ServerInterface : public CallHook { message); } - void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context, - ServerAsyncStreamingInterface* stream, + void RequestAsyncCall(internal::RpcServiceMethod* method, + ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { GPR_CODEGEN_ASSERT(method); @@ -228,7 +232,7 @@ class ServerInterface : public CallHook { } void RequestAsyncGenericCall(GenericServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { diff --git a/include/grpc++/impl/codegen/service_type.h b/include/grpc++/impl/codegen/service_type.h index 2dc4ea0ea6..71c3d99d5c 100644 --- a/include/grpc++/impl/codegen/service_type.h +++ b/include/grpc++/impl/codegen/service_type.h @@ -28,13 +28,14 @@ namespace grpc { -class Call; class CompletionQueue; class Server; class ServerInterface; class ServerCompletionQueue; class ServerContext; +namespace internal { +class Call; class ServerAsyncStreamingInterface { public: virtual ~ServerAsyncStreamingInterface() {} @@ -48,9 +49,10 @@ class ServerAsyncStreamingInterface { virtual void SendInitialMetadata(void* tag) = 0; private: - friend class ServerInterface; + friend class ::grpc::ServerInterface; virtual void BindCall(Call* call) = 0; }; +} // namespace internal /// Desriptor of an RPC service and its various RPC methods class Service { @@ -88,40 +90,38 @@ class Service { protected: template <class Message> void RequestAsyncUnary(int index, ServerContext* context, Message* request, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag, request); } - void RequestAsyncClientStreaming(int index, ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) { + void RequestAsyncClientStreaming( + int index, ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag); } template <class Message> - void RequestAsyncServerStreaming(int index, ServerContext* context, - Message* request, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) { + void RequestAsyncServerStreaming( + int index, ServerContext* context, Message* request, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag, request); } - void RequestAsyncBidiStreaming(int index, ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) { + void RequestAsyncBidiStreaming( + int index, ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag); } - void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); } + void AddMethod(internal::RpcServiceMethod* method) { + methods_.emplace_back(method); + } void MarkMethodAsync(int index) { GPR_CODEGEN_ASSERT( @@ -139,7 +139,7 @@ class Service { methods_[index].reset(); } - void MarkMethodStreamed(int index, MethodHandler* streamed_method) { + void MarkMethodStreamed(int index, internal::MethodHandler* streamed_method) { GPR_CODEGEN_ASSERT(methods_[index] && methods_[index]->handler() && "Cannot mark an async or generic method Streamed"); methods_[index]->SetHandler(streamed_method); @@ -148,14 +148,14 @@ class Service { // case of BIDI_STREAMING that has 1 read and 1 write, in that order, // and split server-side streaming is BIDI_STREAMING with 1 read and // any number of writes, in that order. - methods_[index]->SetMethodType(::grpc::RpcMethod::BIDI_STREAMING); + methods_[index]->SetMethodType(internal::RpcMethod::BIDI_STREAMING); } private: friend class Server; friend class ServerInterface; ServerInterface* server_; - std::vector<std::unique_ptr<RpcServiceMethod>> methods_; + std::vector<std::unique_ptr<internal::RpcServiceMethod>> methods_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 3fa208963d..f9c8303000 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -30,6 +30,7 @@ namespace grpc { +namespace internal { /// Common interface for all synchronous client side streaming. class ClientStreamingInterface { public: @@ -62,20 +63,6 @@ class ClientStreamingInterface { virtual Status Finish() = 0; }; -/// Common interface for all synchronous server side streaming. -class ServerStreamingInterface { - public: - virtual ~ServerStreamingInterface() {} - - /// Block to send initial metadata to client. - /// This call is optional, but if it is used, it cannot be used concurrently - /// with or after the \a Finish method. - /// - /// The initial metadata that will be sent to the client will be - /// taken from the \a ServerContext associated with the call. - virtual void SendInitialMetadata() = 0; -}; - /// An interface that yields a sequence of messages of type \a R. template <class R> class ReaderInterface { @@ -141,16 +128,55 @@ class WriterInterface { } }; +} // namespace internal + /// Client-side interface for streaming reads of message of type \a R. template <class R> -class ClientReaderInterface : public ClientStreamingInterface, - public ReaderInterface<R> { +class ClientReaderInterface : public internal::ClientStreamingInterface, + public internal::ReaderInterface<R> { + public: + /// Block to wait for initial metadata from server. The received metadata + /// can only be accessed after this call returns. Should only be called before + /// the first read. Calling this method is optional, and if it is not called + /// the metadata will be available in ClientContext after the first read. + virtual void WaitForInitialMetadata() = 0; +}; + +/// Client-side interface for streaming writes of message type \a W. +template <class W> +class ClientWriterInterface : public internal::ClientStreamingInterface, + public internal::WriterInterface<W> { + public: + /// Half close writing from the client. (signal that the stream of messages + /// coming from the clinet is complete). + /// Blocks until currently-pending writes are completed. + /// Thread safe with respect to \a ReaderInterface::Read operations only + /// + /// \return Whether the writes were successful. + virtual bool WritesDone() = 0; +}; + +/// Client-side interface for bi-directional streaming with +/// client-to-server stream messages of type \a W and +/// server-to-client stream messages of type \a R. +template <class W, class R> +class ClientReaderWriterInterface : public internal::ClientStreamingInterface, + public internal::WriterInterface<W>, + public internal::ReaderInterface<R> { public: /// Block to wait for initial metadata from server. The received metadata /// can only be accessed after this call returns. Should only be called before /// the first read. Calling this method is optional, and if it is not called /// the metadata will be available in ClientContext after the first read. virtual void WaitForInitialMetadata() = 0; + + /// Half close writing from the client. (signal that the stream of messages + /// coming from the clinet is complete). + /// Blocks until currently-pending writes are completed. + /// Thread-safe with respect to \a ReaderInterface::Read + /// + /// \return Whether the writes were successful. + virtual bool WritesDone() = 0; }; /// Synchronous (blocking) client-side API for doing server-streaming RPCs, @@ -159,28 +185,14 @@ class ClientReaderInterface : public ClientStreamingInterface, template <class R> class ClientReader final : public ClientReaderInterface<R> { public: - /// Block to create a stream and write the initial metadata and \a request - /// out. Note that \a context will be used to fill in custom initial - /// metadata used to send to the server when starting the call. - template <class W> - ClientReader(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, const W& request) - : context_(context), - cq_(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, - GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq - call_(channel->CreateCall(method, context, &cq_)) { - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpClientSendClose> - ops; - ops.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok()); - ops.ClientSendClose(); - call_.PerformOps(&ops); - cq_.Pluck(&ops); - } + struct internal { + template <class W> + static ClientReader* Create(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request) { + return new ClientReader(channel, method, context, request); + } + }; /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for /// semantics. @@ -192,7 +204,8 @@ class ClientReader final : public ClientReaderInterface<R> { void WaitForInitialMetadata() override { GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - CallOpSet<CallOpRecvInitialMetadata> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); cq_.Pluck(&ops); /// status ignored @@ -209,7 +222,9 @@ class ClientReader final : public ClientReaderInterface<R> { /// already received (if initial metadata is received, it can be then /// accessed through the \a ClientContext associated with this call). bool Read(R* msg) override { - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage<R>> + ops; if (!context_->initial_metadata_received_) { ops.RecvInitialMetadata(context_); } @@ -224,7 +239,7 @@ class ClientReader final : public ClientReaderInterface<R> { /// The \a ClientContext associated with this call is updated with /// possible metadata received from the server. Status Finish() override { - CallOpSet<CallOpClientRecvStatus> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientRecvStatus> ops; Status status; ops.ClientRecvStatus(context_, &status); call_.PerformOps(&ops); @@ -235,53 +250,48 @@ class ClientReader final : public ClientReaderInterface<R> { private: ClientContext* context_; CompletionQueue cq_; - Call call_; -}; + ::grpc::internal::Call call_; -/// Client-side interface for streaming writes of message type \a W. -template <class W> -class ClientWriterInterface : public ClientStreamingInterface, - public WriterInterface<W> { - public: - /// Half close writing from the client. (signal that the stream of messages - /// coming from the clinet is complete). - /// Blocks until currently-pending writes are completed. - /// Thread safe with respect to \a ReaderInterface::Read operations only - /// - /// \return Whether the writes were successful. - virtual bool WritesDone() = 0; + /// Block to create a stream and write the initial metadata and \a request + /// out. Note that \a context will be used to fill in custom initial + /// metadata used to send to the server when starting the call. + template <class W> + ClientReader(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> + ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok()); + ops.ClientSendClose(); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } }; /// Synchronous (blocking) client-side API for doing client-streaming RPCs, /// where the outgoing message stream coming from the client has messages of /// type \a W. template <class W> -class ClientWriter : public ClientWriterInterface<W> { +class ClientWriter final : public ClientWriterInterface<W> { public: - /// Block to create a stream (i.e. send request headers and other initial - /// metadata to the server). Note that \a context will be used to fill - /// in custom initial metadata. \a response will be filled in with the - /// single expected response message from the server upon a successful - /// call to the \a Finish method of this instance. - template <class R> - ClientWriter(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, R* response) - : context_(context), - cq_(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, - GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq - call_(channel->CreateCall(method, context, &cq_)) { - finish_ops_.RecvMessage(response); - finish_ops_.AllowNoMessage(); - - if (!context_->initial_metadata_corked_) { - CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&ops); - cq_.Pluck(&ops); + struct internal { + template <class R> + static ClientWriter* Create(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, R* response) { + return new ClientWriter(channel, method, context, response); } - } + }; /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for /// semantics. @@ -292,7 +302,8 @@ class ClientWriter : public ClientWriterInterface<W> { void WaitForInitialMetadata() { GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - CallOpSet<CallOpRecvInitialMetadata> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); cq_.Pluck(&ops); // status ignored @@ -304,10 +315,11 @@ class ClientWriter : public ClientWriterInterface<W> { /// Side effect: /// Also sends initial metadata if not already sent (using the /// \a ClientContext associated with this call). - using WriterInterface<W>::Write; + using ::grpc::internal::WriterInterface<W>::Write; bool Write(const W& msg, WriteOptions options) override { - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpClientSendClose> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> ops; if (options.is_last_message()) { @@ -328,7 +340,7 @@ class ClientWriter : public ClientWriterInterface<W> { } bool WritesDone() override { - CallOpSet<CallOpClientSendClose> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; ops.ClientSendClose(); call_.PerformOps(&ops); return cq_.Pluck(&ops); @@ -353,61 +365,55 @@ class ClientWriter : public ClientWriterInterface<W> { private: ClientContext* context_; - CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, - CallOpClientRecvStatus> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpGenericRecvMessage, + ::grpc::internal::CallOpClientRecvStatus> finish_ops_; CompletionQueue cq_; - Call call_; -}; + ::grpc::internal::Call call_; -/// Client-side interface for bi-directional streaming with -/// client-to-server stream messages of type \a W and -/// server-to-client stream messages of type \a R. -template <class W, class R> -class ClientReaderWriterInterface : public ClientStreamingInterface, - public WriterInterface<W>, - public ReaderInterface<R> { - public: - /// Block to wait for initial metadata from server. The received metadata - /// can only be accessed after this call returns. Should only be called before - /// the first read. Calling this method is optional, and if it is not called - /// the metadata will be available in ClientContext after the first read. - virtual void WaitForInitialMetadata() = 0; - - /// Half close writing from the client. (signal that the stream of messages - /// coming from the clinet is complete). - /// Blocks until currently-pending writes are completed. - /// Thread-safe with respect to \a ReaderInterface::Read - /// - /// \return Whether the writes were successful. - virtual bool WritesDone() = 0; -}; - -/// Synchronous (blocking) client-side API for bi-directional streaming RPCs, -/// where the outgoing message stream coming from the client has messages of -/// type \a W, and the incoming messages stream coming from the server has -/// messages of type \a R. -template <class W, class R> -class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { - public: - /// Block to create a stream and write the initial metadata and \a request - /// out. Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context) + /// Block to create a stream (i.e. send request headers and other initial + /// metadata to the server). Note that \a context will be used to fill + /// in custom initial metadata. \a response will be filled in with the + /// single expected response message from the server upon a successful + /// call to the \a Finish method of this instance. + template <class R> + ClientWriter(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, R* response) : context_(context), cq_(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq call_(channel->CreateCall(method, context, &cq_)) { + finish_ops_.RecvMessage(response); + finish_ops_.AllowNoMessage(); + if (!context_->initial_metadata_corked_) { - CallOpSet<CallOpSendInitialMetadata> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + ops; ops.SendInitialMetadata(context->send_initial_metadata_, context->initial_metadata_flags()); call_.PerformOps(&ops); cq_.Pluck(&ops); } } +}; + +/// Synchronous (blocking) client-side API for bi-directional streaming RPCs, +/// where the outgoing message stream coming from the client has messages of +/// type \a W, and the incoming messages stream coming from the server has +/// messages of type \a R. +template <class W, class R> +class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { + public: + struct internal { + static ClientReaderWriter* Create(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context) { + return new ClientReaderWriter(channel, method, context); + } + }; /// Block waiting to read initial metadata from the server. /// This call is optional, but if it is used, it cannot be used concurrently @@ -418,7 +424,8 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { void WaitForInitialMetadata() override { GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - CallOpSet<CallOpRecvInitialMetadata> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); cq_.Pluck(&ops); // status ignored @@ -434,7 +441,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { /// Also receives initial metadata if not already received (updates the \a /// ClientContext associated with this call in that case). bool Read(R* msg) override { - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage<R>> + ops; if (!context_->initial_metadata_received_) { ops.RecvInitialMetadata(context_); } @@ -448,10 +457,11 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { /// Side effect: /// Also sends initial metadata if not already sent (using the /// \a ClientContext associated with this call to fill in values). - using WriterInterface<W>::Write; + using ::grpc::internal::WriterInterface<W>::Write; bool Write(const W& msg, WriteOptions options) override { - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpClientSendClose> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> ops; if (options.is_last_message()) { @@ -472,7 +482,7 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { } bool WritesDone() override { - CallOpSet<CallOpClientSendClose> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; ops.ClientSendClose(); call_.PerformOps(&ops); return cq_.Pluck(&ops); @@ -484,7 +494,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { /// - the \a ClientContext associated with this call is updated with /// possible trailing metadata sent from the server. Status Finish() override { - CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpClientRecvStatus> + ops; if (!context_->initial_metadata_received_) { ops.RecvInitialMetadata(context_); } @@ -498,13 +510,61 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { private: ClientContext* context_; CompletionQueue cq_; - Call call_; + ::grpc::internal::Call call_; + + /// Block to create a stream and write the initial metadata and \a request + /// out. Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + ClientReaderWriter(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + if (!context_->initial_metadata_corked_) { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + } }; +namespace internal { +/// Common interface for all synchronous server side streaming. +class ServerStreamingInterface { + public: + virtual ~ServerStreamingInterface() {} + + /// Block to send initial metadata to client. + /// This call is optional, but if it is used, it cannot be used concurrently + /// with or after the \a Finish method. + /// + /// The initial metadata that will be sent to the client will be + /// taken from the \a ServerContext associated with the call. + virtual void SendInitialMetadata() = 0; +}; +} // namespace internal + /// Server-side interface for streaming reads of message of type \a R. template <class R> -class ServerReaderInterface : public ServerStreamingInterface, - public ReaderInterface<R> {}; +class ServerReaderInterface : public internal::ServerStreamingInterface, + public internal::ReaderInterface<R> {}; + +/// Server-side interface for streaming writes of message of type \a W. +template <class W> +class ServerWriterInterface : public internal::ServerStreamingInterface, + public internal::WriterInterface<W> {}; + +/// Server-side interface for bi-directional streaming. +template <class W, class R> +class ServerReaderWriterInterface : public internal::ServerStreamingInterface, + public internal::WriterInterface<W>, + public internal::ReaderInterface<R> {}; /// Synchronous (blocking) server-side API for doing client-streaming RPCs, /// where the incoming message stream coming from the client has messages of @@ -512,15 +572,13 @@ class ServerReaderInterface : public ServerStreamingInterface, template <class R> class ServerReader final : public ServerReaderInterface<R> { public: - ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} - /// See the \a ServerStreamingInterface.SendInitialMetadata method /// for semantics. Note that initial metadata will be affected by the /// \a ServerContext associated with this call. void SendInitialMetadata() override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - CallOpSet<CallOpSendInitialMetadata> ops; + internal::CallOpSet<internal::CallOpSendInitialMetadata> ops; ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { @@ -537,21 +595,22 @@ class ServerReader final : public ServerReaderInterface<R> { } bool Read(R* msg) override { - CallOpSet<CallOpRecvMessage<R>> ops; + internal::CallOpSet<internal::CallOpRecvMessage<R>> ops; ops.RecvMessage(msg); call_->PerformOps(&ops); return call_->cq()->Pluck(&ops) && ops.got_message; } private: - Call* const call_; + internal::Call* const call_; ServerContext* const ctx_; -}; -/// Server-side interface for streaming writes of message of type \a W. -template <class W> -class ServerWriterInterface : public ServerStreamingInterface, - public WriterInterface<W> {}; + template <class ServiceType, class RequestType, class ResponseType> + friend class internal::ClientStreamingHandler; + + ServerReader(internal::Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} +}; /// Synchronous (blocking) server-side API for doing for doing a /// server-streaming RPCs, where the outgoing message stream coming from the @@ -559,8 +618,6 @@ class ServerWriterInterface : public ServerStreamingInterface, template <class W> class ServerWriter final : public ServerWriterInterface<W> { public: - ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} - /// See the \a ServerStreamingInterface.SendInitialMetadata method /// for semantics. /// Note that initial metadata will be affected by the @@ -568,7 +625,7 @@ class ServerWriter final : public ServerWriterInterface<W> { void SendInitialMetadata() override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - CallOpSet<CallOpSendInitialMetadata> ops; + internal::CallOpSet<internal::CallOpSendInitialMetadata> ops; ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { @@ -584,11 +641,12 @@ class ServerWriter final : public ServerWriterInterface<W> { /// Side effect: /// Also sends initial metadata if not already sent (using the /// \a ClientContext associated with this call to fill in values). - using WriterInterface<W>::Write; + using internal::WriterInterface<W>::Write; bool Write(const W& msg, WriteOptions options) override { if (options.is_last_message()) { options.set_buffer_hint(); } + if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) { return false; } @@ -613,15 +671,15 @@ class ServerWriter final : public ServerWriterInterface<W> { } private: - Call* const call_; + internal::Call* const call_; ServerContext* const ctx_; -}; -/// Server-side interface for bi-directional streaming. -template <class W, class R> -class ServerReaderWriterInterface : public ServerStreamingInterface, - public WriterInterface<W>, - public ReaderInterface<R> {}; + template <class ServiceType, class RequestType, class ResponseType> + friend class internal::ServerStreamingHandler; + + ServerWriter(internal::Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} +}; /// Actual implementation of bi-directional streaming namespace internal { @@ -688,6 +746,7 @@ class ServerReaderWriterBody final { Call* const call_; ServerContext* const ctx_; }; + } // namespace internal /// Synchronous (blocking) server-side API for a bidirectional @@ -697,8 +756,6 @@ class ServerReaderWriterBody final { template <class W, class R> class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> { public: - ServerReaderWriter(Call* call, ServerContext* ctx) : body_(call, ctx) {} - /// See the \a ServerStreamingInterface.SendInitialMetadata method /// for semantics. Note that initial metadata will be affected by the /// \a ServerContext associated with this call. @@ -715,13 +772,18 @@ class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> { /// Side effect: /// Also sends initial metadata if not already sent (using the \a /// ServerContext associated with this call). - using WriterInterface<W>::Write; + using internal::WriterInterface<W>::Write; bool Write(const W& msg, WriteOptions options) override { return body_.Write(msg, options); } private: internal::ServerReaderWriterBody<W, R> body_; + + friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>, + false>; + ServerReaderWriter(internal::Call* call, ServerContext* ctx) + : body_(call, ctx) {} }; /// A class to represent a flow-controlled unary call. This is something @@ -736,9 +798,6 @@ template <class RequestType, class ResponseType> class ServerUnaryStreamer final : public ServerReaderWriterInterface<ResponseType, RequestType> { public: - ServerUnaryStreamer(Call* call, ServerContext* ctx) - : body_(call, ctx), read_done_(false), write_done_(false) {} - /// Block to send initial metadata to client. /// Implicit input parameter: /// - the \a ServerContext associated with this call will be used for @@ -775,7 +834,7 @@ class ServerUnaryStreamer final /// \param options The WriteOptions affecting the write operation. /// /// \return \a true on success, \a false when the stream has been closed. - using WriterInterface<ResponseType>::Write; + using internal::WriterInterface<ResponseType>::Write; bool Write(const ResponseType& response, WriteOptions options) override { if (write_done_ || !read_done_) { return false; @@ -788,6 +847,11 @@ class ServerUnaryStreamer final internal::ServerReaderWriterBody<ResponseType, RequestType> body_; bool read_done_; bool write_done_; + + friend class internal::TemplatedBidiStreamingHandler< + ServerUnaryStreamer<RequestType, ResponseType>, true>; + ServerUnaryStreamer(internal::Call* call, ServerContext* ctx) + : body_(call, ctx), read_done_(false), write_done_(false) {} }; /// A class to represent a flow-controlled server-side streaming call. @@ -799,9 +863,6 @@ template <class RequestType, class ResponseType> class ServerSplitStreamer final : public ServerReaderWriterInterface<ResponseType, RequestType> { public: - ServerSplitStreamer(Call* call, ServerContext* ctx) - : body_(call, ctx), read_done_(false) {} - /// Block to send initial metadata to client. /// Implicit input parameter: /// - the \a ServerContext associated with this call will be used for @@ -838,7 +899,7 @@ class ServerSplitStreamer final /// \param options The WriteOptions affecting the write operation. /// /// \return \a true on success, \a false when the stream has been closed. - using WriterInterface<ResponseType>::Write; + using internal::WriterInterface<ResponseType>::Write; bool Write(const ResponseType& response, WriteOptions options) override { return read_done_ && body_.Write(response, options); } @@ -846,6 +907,11 @@ class ServerSplitStreamer final private: internal::ServerReaderWriterBody<ResponseType, RequestType> body_; bool read_done_; + + friend class internal::TemplatedBidiStreamingHandler< + ServerSplitStreamer<RequestType, ResponseType>, false>; + ServerSplitStreamer(internal::Call* call, ServerContext* ctx) + : body_(call, ctx), read_done_(false) {} }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/time.h b/include/grpc++/impl/codegen/time.h index 589deb4f03..d464d6ea13 100644 --- a/include/grpc++/impl/codegen/time.h +++ b/include/grpc++/impl/codegen/time.h @@ -19,6 +19,8 @@ #ifndef GRPCXX_IMPL_CODEGEN_TIME_H #define GRPCXX_IMPL_CODEGEN_TIME_H +#include <chrono> + #include <grpc++/impl/codegen/config.h> #include <grpc/impl/codegen/grpc_types.h> @@ -59,10 +61,6 @@ class TimePoint<gpr_timespec> { } // namespace grpc -#include <chrono> - -#include <grpc/impl/codegen/grpc_types.h> - namespace grpc { // from and to should be absolute time. diff --git a/include/grpc++/server.h b/include/grpc++/server.h index d76a745ac9..baf0ded9ab 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -172,7 +172,8 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { /// \param num_cqs How many completion queues does \a cqs hold. void Start(ServerCompletionQueue** cqs, size_t num_cqs) override; - void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) override; + void PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) override; void ShutdownInternal(gpr_timespec deadline) override; diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 50f947d4c7..f3752abc1f 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -40,7 +40,6 @@ namespace grpc { class AsyncGenericService; class ResourceQuota; class CompletionQueue; -class RpcService; class Server; class ServerCompletionQueue; class ServerCredentials; |