diff options
author | Craig Tiller <ctiller@google.com> | 2017-08-31 10:54:11 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-08-31 10:54:11 -0700 |
commit | 8fd40d5ed9303c693865ff431b7565173a4b898e (patch) | |
tree | 5d3f6ce958de74e79a8bbaf71b7eb8bbd8acfecd /include | |
parent | c9c78ee96c5a73234513decd398a63d48f14aa89 (diff) | |
parent | 8d51e8d17e012f81ca8e94c18f525e1781130481 (diff) |
Merge github.com:grpc/grpc into wc
Diffstat (limited to 'include')
34 files changed, 817 insertions, 835 deletions
diff --git a/include/grpc++/alarm.h b/include/grpc++/alarm.h index 00a9306d9d..ed8dacbc94 100644 --- a/include/grpc++/alarm.h +++ b/include/grpc++/alarm.h @@ -77,7 +77,7 @@ class Alarm : private GrpcLibraryCodegen { void Cancel() { grpc_alarm_cancel(alarm_); } private: - class AlarmEntry : public internal::CompletionQueueTag { + class AlarmEntry : public CompletionQueueTag { public: AlarmEntry(void* tag) : tag_(tag) {} bool FinalizeResult(void** tag, bool* status) override { diff --git a/include/grpc++/channel.h b/include/grpc++/channel.h index 5ba3c591f0..c50091d6ac 100644 --- a/include/grpc++/channel.h +++ b/include/grpc++/channel.h @@ -32,7 +32,7 @@ struct grpc_channel; namespace grpc { /// Channels represent a connection to an endpoint. Created by \a CreateChannel. class Channel final : public ChannelInterface, - public internal::CallHook, + public CallHook, public std::enable_shared_from_this<Channel>, private GrpcLibraryCodegen { public: @@ -52,7 +52,7 @@ class Channel final : public ChannelInterface, private: template <class InputMessage, class OutputMessage> friend Status BlockingUnaryCall(ChannelInterface* channel, - const internal::RpcMethod& method, + const RpcMethod& method, ClientContext* context, const InputMessage& request, OutputMessage* result); @@ -60,11 +60,9 @@ class Channel final : public ChannelInterface, const grpc::string& host, grpc_channel* c_channel); Channel(const grpc::string& host, grpc_channel* c_channel); - internal::Call CreateCall(const internal::RpcMethod& method, - ClientContext* context, - CompletionQueue* cq) override; - void PerformOpsOnCall(internal::CallOpSetInterface* ops, - internal::Call* call) override; + Call CreateCall(const RpcMethod& method, ClientContext* context, + CompletionQueue* cq) override; + void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) override; void* RegisterMethod(const char* method) override; void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index ddbf3e655e..9cf7ac30dd 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -30,7 +30,6 @@ namespace grpc { class CompletionQueue; -namespace internal { /// Common interface for all client side asynchronous streaming. class ClientAsyncStreamingInterface { public: @@ -147,41 +146,9 @@ class AsyncWriterInterface { } }; -} // namespace internal - template <class R> -class ClientAsyncReaderInterface - : public internal::ClientAsyncStreamingInterface, - public internal::AsyncReaderInterface<R> {}; - -/// Common interface for client side asynchronous writing. -template <class W> -class ClientAsyncWriterInterface - : public internal::ClientAsyncStreamingInterface, - public internal::AsyncWriterInterface<W> { - public: - /// Signal the client is done with the writes (half-close the client stream). - /// Thread-safe with respect to \a AsyncReaderInterface::Read - /// - /// \param[in] tag The tag identifying the operation. - virtual void WritesDone(void* tag) = 0; -}; - -/// Async client-side interface for bi-directional streaming, -/// where the client-to-server message stream has messages of type \a W, -/// and the server-to-client message stream has messages of type \a R. -template <class W, class R> -class ClientAsyncReaderWriterInterface - : public internal::ClientAsyncStreamingInterface, - public internal::AsyncWriterInterface<W>, - public internal::AsyncReaderInterface<R> { - public: - /// Signal the client is done with the writes (half-close the client stream). - /// Thread-safe with respect to \a AsyncReaderInterface::Read - /// - /// \param[in] tag The tag identifying the operation. - virtual void WritesDone(void* tag) = 0; -}; +class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface, + public AsyncReaderInterface<R> {}; /// Async client-side API for doing server-streaming RPCs, /// where the incoming message stream coming from the server has @@ -189,24 +156,21 @@ class ClientAsyncReaderWriterInterface template <class R> class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { public: - struct internal { - /// Create a stream and write the first request out. - /// \a tag will be notified on \a cq when the call has been started and - /// \a request has been written out. - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - template <class W> - static ClientAsyncReader* Create(::grpc::ChannelInterface* channel, - CompletionQueue* cq, - const ::grpc::internal::RpcMethod& method, - ClientContext* context, const W& request, - void* tag) { - ::grpc::internal::Call call = channel->CreateCall(method, context, cq); - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncReader))) - ClientAsyncReader(call, context, request, tag); - } - }; + /// Create a stream and write the first request out. + /// \a tag will be notified on \a cq when the call has been started and + /// \a request has been written out. + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + template <class W> + static ClientAsyncReader* Create(ChannelInterface* channel, + CompletionQueue* cq, const RpcMethod& method, + ClientContext* context, const W& request, + void* tag) { + Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncReader))) + ClientAsyncReader(call, context, request, tag); + } // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -254,8 +218,8 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { private: template <class W> - ClientAsyncReader(::grpc::internal::Call call, ClientContext* context, - const W& request, void* tag) + ClientAsyncReader(Call call, ClientContext* context, const W& request, + void* tag) : context_(context), call_(call) { init_ops_.set_output_tag(tag); init_ops_.SendInitialMetadata(context->send_initial_metadata_, @@ -267,19 +231,24 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { } ClientContext* context_; - ::grpc::internal::Call call_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpClientSendClose> + Call call_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> init_ops_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> - meta_ops_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, - ::grpc::internal::CallOpRecvMessage<R>> - read_ops_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, - ::grpc::internal::CallOpClientRecvStatus> - finish_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; +}; + +/// Common interface for client side asynchronous writing. +template <class W> +class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, + public AsyncWriterInterface<W> { + public: + /// Signal the client is done with the writes (half-close the client stream). + /// Thread-safe with respect to \a AsyncReaderInterface::Read + /// + /// \param[in] tag The tag identifying the operation. + virtual void WritesDone(void* tag) = 0; }; /// Async API on the client side for doing client-streaming RPCs, @@ -288,27 +257,24 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { template <class W> class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { public: - struct internal { - /// Create a stream and write the first request out. - /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent) and \a request has been written out. - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - /// \a response will be filled in with the single expected response - /// message from the server upon a successful call to the \a Finish - /// method of this instance. - template <class R> - static ClientAsyncWriter* Create(::grpc::ChannelInterface* channel, - CompletionQueue* cq, - const ::grpc::internal::RpcMethod& method, - ClientContext* context, R* response, - void* tag) { - ::grpc::internal::Call call = channel->CreateCall(method, context, cq); - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncWriter))) - ClientAsyncWriter(call, context, response, tag); - } - }; + /// Create a stream and write the first request out. + /// \a tag will be notified on \a cq when the call has been started (i.e. + /// intitial metadata sent) and \a request has been written out. + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + /// \a response will be filled in with the single expected response + /// message from the server upon a successful call to the \a Finish + /// method of this instance. + template <class R> + static ClientAsyncWriter* Create(ChannelInterface* channel, + CompletionQueue* cq, const RpcMethod& method, + ClientContext* context, R* response, + void* tag) { + Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncWriter))) + ClientAsyncWriter(call, context, response, tag); + } // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -372,8 +338,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { private: template <class R> - ClientAsyncWriter(::grpc::internal::Call call, ClientContext* context, - R* response, void* tag) + ClientAsyncWriter(Call call, ClientContext* context, R* response, void* tag) : context_(context), call_(call) { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); @@ -391,20 +356,31 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { } ClientContext* context_; - ::grpc::internal::Call call_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> - meta_ops_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpClientSendClose> + Call call_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> write_ops_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, - ::grpc::internal::CallOpGenericRecvMessage, - ::grpc::internal::CallOpClientRecvStatus> + CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, + CallOpClientRecvStatus> finish_ops_; }; /// Async client-side interface for bi-directional streaming, +/// where the client-to-server message stream has messages of type \a W, +/// and the server-to-client message stream has messages of type \a R. +template <class W, class R> +class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface, + public AsyncWriterInterface<W>, + public AsyncReaderInterface<R> { + public: + /// Signal the client is done with the writes (half-close the client stream). + /// Thread-safe with respect to \a AsyncReaderInterface::Read + /// + /// \param[in] tag The tag identifying the operation. + virtual void WritesDone(void* tag) = 0; +}; + +/// Async client-side interface for bi-directional streaming, /// where the outgoing message stream going to the server /// has messages of type \a W, and the incoming message stream coming /// from the server has messages of type \a R. @@ -412,23 +388,21 @@ template <class W, class R> class ClientAsyncReaderWriter final : public ClientAsyncReaderWriterInterface<W, R> { public: - struct internal { - /// Create a stream and write the first request out. - /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent). - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - static ClientAsyncReaderWriter* Create( - ::grpc::ChannelInterface* channel, CompletionQueue* cq, - const ::grpc::internal::RpcMethod& method, ClientContext* context, - void* tag) { - ::grpc::internal::Call call = channel->CreateCall(method, context, cq); - - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncReaderWriter))) - ClientAsyncReaderWriter(call, context, tag); - } - }; + /// Create a stream and write the first request out. + /// \a tag will be notified on \a cq when the call has been started (i.e. + /// intitial metadata sent). + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + static ClientAsyncReaderWriter* Create(ChannelInterface* channel, + CompletionQueue* cq, + const RpcMethod& method, + ClientContext* context, void* tag) { + Call call = channel->CreateCall(method, context, cq); + + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncReaderWriter))) + ClientAsyncReaderWriter(call, context, tag); + } // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -497,8 +471,7 @@ class ClientAsyncReaderWriter final } private: - ClientAsyncReaderWriter(::grpc::internal::Call call, ClientContext* context, - void* tag) + ClientAsyncReaderWriter(Call call, ClientContext* context, void* tag) : context_(context), call_(call) { if (context_->initial_metadata_corked_) { // if corked bit is set in context, we buffer up the initial metadata to @@ -514,25 +487,17 @@ class ClientAsyncReaderWriter final } ClientContext* context_; - ::grpc::internal::Call call_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> - meta_ops_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, - ::grpc::internal::CallOpRecvMessage<R>> - read_ops_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpClientSendClose> + Call call_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> write_ops_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, - ::grpc::internal::CallOpClientRecvStatus> - finish_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; }; template <class W, class R> -class ServerAsyncReaderInterface - : public internal::ServerAsyncStreamingInterface, - public internal::AsyncReaderInterface<R> { +class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface, + public AsyncReaderInterface<R> { public: /// Indicate that the stream is to be finished with a certain status code /// and also send out \a msg response to the client. @@ -576,89 +541,6 @@ class ServerAsyncReaderInterface virtual void FinishWithError(const Status& status, void* tag) = 0; }; -template <class W> -class ServerAsyncWriterInterface - : public internal::ServerAsyncStreamingInterface, - public internal::AsyncWriterInterface<W> { - public: - /// Indicate that the stream is to be finished with a certain status code. - /// Request notification for when the server has sent the appropriate - /// signals to the client to end the call. - /// Should not be used concurrently with other operations. - /// - /// It is appropriate to call this method when either: - /// * all messages from the client have been received (either known - /// implictly, or explicitly because a previous \a - /// AsyncReaderInterface::Read operation with a non-ok - /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'. - /// * it is desired to end the call early with some non-OK status code. - /// - /// This operation will end when the server has finished sending out initial - /// metadata (if not sent already), response message, and status, or if - /// some failure occurred when trying to do so. - /// - /// \param[in] tag Tag identifying this request. - /// \param[in] status To be sent to the client as the result of this call. - virtual void Finish(const Status& status, void* tag) = 0; - - /// Request the writing of \a msg and coalesce it with trailing metadata which - /// contains \a status, using WriteOptions options with - /// identifying tag \a tag. - /// - /// WriteAndFinish is equivalent of performing WriteLast and Finish - /// in a single step. - /// - /// \param[in] msg The message to be written. - /// \param[in] options The WriteOptions to be used to write this message. - /// \param[in] status The Status that server returns to client. - /// \param[in] tag The tag identifying the operation. - virtual void WriteAndFinish(const W& msg, WriteOptions options, - const Status& status, void* tag) = 0; -}; - -/// Server-side interface for asynchronous bi-directional streaming. -template <class W, class R> -class ServerAsyncReaderWriterInterface - : public internal::ServerAsyncStreamingInterface, - public internal::AsyncWriterInterface<W>, - public internal::AsyncReaderInterface<R> { - public: - /// Indicate that the stream is to be finished with a certain status code. - /// Request notification for when the server has sent the appropriate - /// signals to the client to end the call. - /// Should not be used concurrently with other operations. - /// - /// It is appropriate to call this method when either: - /// * all messages from the client have been received (either known - /// implictly, or explicitly because a previous \a - /// AsyncReaderInterface::Read operation - /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' - /// with 'false'. - /// * it is desired to end the call early with some non-OK status code. - /// - /// This operation will end when the server has finished sending out initial - /// metadata (if not sent already), response message, and status, or if some - /// failure occurred when trying to do so. - /// - /// \param[in] tag Tag identifying this request. - /// \param[in] status To be sent to the client as the result of this call. - virtual void Finish(const Status& status, void* tag) = 0; - - /// Request the writing of \a msg and coalesce it with trailing metadata which - /// contains \a status, using WriteOptions options with - /// identifying tag \a tag. - /// - /// WriteAndFinish is equivalent of performing WriteLast and Finish in a - /// single step. - /// - /// \param[in] msg The message to be written. - /// \param[in] options The WriteOptions to be used to write this message. - /// \param[in] status The Status that server returns to client. - /// \param[in] tag The tag identifying the operation. - virtual void WriteAndFinish(const W& msg, WriteOptions options, - const Status& status, void* tag) = 0; -}; - /// Async server-side API for doing client-streaming RPCs, /// where the incoming message stream from the client has messages of type \a R, /// and the single response message sent from the server is type \a W. @@ -742,19 +624,56 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> { } private: - void BindCall(::grpc::internal::Call* call) override { call_ = *call; } + void BindCall(Call* call) override { call_ = *call; } - ::grpc::internal::Call call_; + Call call_; ServerContext* ctx_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> - meta_ops_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpServerSendStatus> + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> finish_ops_; }; +template <class W> +class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface, + public AsyncWriterInterface<W> { + public: + /// Indicate that the stream is to be finished with a certain status code. + /// Request notification for when the server has sent the appropriate + /// signals to the client to end the call. + /// Should not be used concurrently with other operations. + /// + /// It is appropriate to call this method when either: + /// * all messages from the client have been received (either known + /// implictly, or explicitly because a previous \a + /// AsyncReaderInterface::Read operation with a non-ok + /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'. + /// * it is desired to end the call early with some non-OK status code. + /// + /// This operation will end when the server has finished sending out initial + /// metadata (if not sent already), response message, and status, or if + /// some failure occurred when trying to do so. + /// + /// \param[in] tag Tag identifying this request. + /// \param[in] status To be sent to the client as the result of this call. + virtual void Finish(const Status& status, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with trailing metadata which + /// contains \a status, using WriteOptions options with + /// identifying tag \a tag. + /// + /// WriteAndFinish is equivalent of performing WriteLast and Finish + /// in a single step. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] status The Status that server returns to client. + /// \param[in] tag The tag identifying the operation. + virtual void WriteAndFinish(const W& msg, WriteOptions options, + const Status& status, void* tag) = 0; +}; + /// Async server-side API for doing server streaming RPCs, /// where the outgoing message stream from the server has messages of type \a W. template <class W> @@ -836,7 +755,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { } private: - void BindCall(::grpc::internal::Call* call) override { call_ = *call; } + void BindCall(Call* call) override { call_ = *call; } template <class T> void EnsureInitialMetadataSent(T* ops) { @@ -850,17 +769,55 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { } } - ::grpc::internal::Call call_; + Call call_; ServerContext* ctx_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> - meta_ops_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpServerSendStatus> + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> write_ops_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpServerSendStatus> - finish_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; +}; + +/// Server-side interface for asynchronous bi-directional streaming. +template <class W, class R> +class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface, + public AsyncWriterInterface<W>, + public AsyncReaderInterface<R> { + public: + /// Indicate that the stream is to be finished with a certain status code. + /// Request notification for when the server has sent the appropriate + /// signals to the client to end the call. + /// Should not be used concurrently with other operations. + /// + /// It is appropriate to call this method when either: + /// * all messages from the client have been received (either known + /// implictly, or explicitly because a previous \a + /// AsyncReaderInterface::Read operation + /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' + /// with 'false'. + /// * it is desired to end the call early with some non-OK status code. + /// + /// This operation will end when the server has finished sending out initial + /// metadata (if not sent already), response message, and status, or if some + /// failure occurred when trying to do so. + /// + /// \param[in] tag Tag identifying this request. + /// \param[in] status To be sent to the client as the result of this call. + virtual void Finish(const Status& status, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with trailing metadata which + /// contains \a status, using WriteOptions options with + /// identifying tag \a tag. + /// + /// WriteAndFinish is equivalent of performing WriteLast and Finish in a + /// single step. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] status The Status that server returns to client. + /// \param[in] tag The tag identifying the operation. + virtual void WriteAndFinish(const W& msg, WriteOptions options, + const Status& status, void* tag) = 0; }; /// Async server-side API for doing bidirectional streaming RPCs, @@ -955,7 +912,7 @@ class ServerAsyncReaderWriter final private: friend class ::grpc::Server; - void BindCall(::grpc::internal::Call* call) override { call_ = *call; } + void BindCall(Call* call) override { call_ = *call; } template <class T> void EnsureInitialMetadataSent(T* ops) { @@ -969,18 +926,14 @@ class ServerAsyncReaderWriter final } } - ::grpc::internal::Call call_; + Call call_; ServerContext* ctx_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> - meta_ops_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpServerSendStatus> + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> write_ops_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpServerSendStatus> - finish_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index 45a8e8ee6a..f0f909686b 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -75,40 +75,16 @@ class ClientAsyncResponseReader final /// intitial metadata sent) and \a request has been written out. /// Note that \a context will be used to fill in custom initial metadata /// used to send to the server when starting the call. - struct internal { - template <class W> - static ClientAsyncResponseReader* Create( - ::grpc::ChannelInterface* channel, CompletionQueue* cq, - const ::grpc::internal::RpcMethod& method, ClientContext* context, - const W& request) { - ::grpc::internal::Call call = channel->CreateCall(method, context, cq); - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncResponseReader))) - ClientAsyncResponseReader(call, context, request); - } - }; - - /// TODO(vjpai): Delete the below constructor - /// PLEASE DO NOT USE THIS CONSTRUCTOR IN NEW CODE - /// This code is only present as a short-term workaround - /// for users that bypassed the code-generator and directly - /// created this struct rather than properly using a stub. - /// This code will not remain a valid public constructor for long. template <class W> - ClientAsyncResponseReader(::grpc::ChannelInterface* channel, - CompletionQueue* cq, - const ::grpc::internal::RpcMethod& method, - ClientContext* context, const W& request) - : context_(context), - call_(channel->CreateCall(method, context, cq)), - collection_(std::make_shared<Ops>()) { - collection_->init_buf.SetCollection(collection_); - collection_->init_buf.SendInitialMetadata( - context->send_initial_metadata_, context->initial_metadata_flags()); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(collection_->init_buf.SendMessage(request).ok()); - collection_->init_buf.ClientSendClose(); - call_.PerformOps(&collection_->init_buf); + static ClientAsyncResponseReader* Create(ChannelInterface* channel, + CompletionQueue* cq, + const RpcMethod& method, + ClientContext* context, + const W& request) { + Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncResponseReader))) + ClientAsyncResponseReader(call, context, request); } // always allocated against a call arena, no memory free required @@ -121,22 +97,13 @@ class ClientAsyncResponseReader final /// /// Side effect: /// - the \a ClientContext associated with this call is updated with - /// possible initial and trailing metadata sent from the serve. + /// possible initial and trailing metadata sent from the server. void ReadInitialMetadata(void* tag) { GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - Ops* o = &ops_; - - // TODO(vjpai): Remove the collection_ specialization as soon - // as the public constructor is deleted - if (collection_) { - o = collection_.get(); - collection_->meta_buf.SetCollection(collection_); - } - - o->meta_buf.set_output_tag(tag); - o->meta_buf.RecvInitialMetadata(context_); - call_.PerformOps(&o->meta_buf); + meta_buf.set_output_tag(tag); + meta_buf.RecvInitialMetadata(context_); + call_.PerformOps(&meta_buf); } /// See \a ClientAysncResponseReaderInterface::Finish for semantics. @@ -145,71 +112,48 @@ class ClientAsyncResponseReader final /// - the \a ClientContext associated with this call is updated with /// possible initial and trailing metadata sent from the server. void Finish(R* msg, Status* status, void* tag) { - Ops* o = &ops_; - - // TODO(vjpai): Remove the collection_ specialization as soon - // as the public constructor is deleted - if (collection_) { - o = collection_.get(); - collection_->finish_buf.SetCollection(collection_); - } - - o->finish_buf.set_output_tag(tag); + finish_buf.set_output_tag(tag); if (!context_->initial_metadata_received_) { - o->finish_buf.RecvInitialMetadata(context_); + finish_buf.RecvInitialMetadata(context_); } - o->finish_buf.RecvMessage(msg); - o->finish_buf.AllowNoMessage(); - o->finish_buf.ClientRecvStatus(context_, status); - call_.PerformOps(&o->finish_buf); + finish_buf.RecvMessage(msg); + finish_buf.AllowNoMessage(); + finish_buf.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_buf); } private: ClientContext* const context_; - ::grpc::internal::Call call_; + Call call_; template <class W> - ClientAsyncResponseReader(::grpc::internal::Call call, ClientContext* context, - const W& request) + ClientAsyncResponseReader(Call call, ClientContext* context, const W& request) : context_(context), call_(call) { - ops_.init_buf.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); + init_buf.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(ops_.init_buf.SendMessage(request).ok()); - ops_.init_buf.ClientSendClose(); - call_.PerformOps(&ops_.init_buf); + GPR_CODEGEN_ASSERT(init_buf.SendMessage(request).ok()); + init_buf.ClientSendClose(); + call_.PerformOps(&init_buf); } // disable operator new static void* operator new(std::size_t size); static void* operator new(std::size_t size, void* p) { return p; } - // TODO(vjpai): Remove the reference to CallOpSetCollectionInterface - // as soon as the related workaround (public constructor) is deleted - struct Ops : public ::grpc::internal::CallOpSetCollectionInterface { - ::grpc::internal::SneakyCallOpSet< - ::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpClientSendClose> - init_buf; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> - meta_buf; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, - ::grpc::internal::CallOpRecvMessage<R>, - ::grpc::internal::CallOpClientRecvStatus> - finish_buf; - } ops_; - - // TODO(vjpai): Remove the collection_ as soon as the related workaround - // (public constructor) is deleted - std::shared_ptr<Ops> collection_; + SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> + init_buf; + CallOpSet<CallOpRecvInitialMetadata> meta_buf; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>, + CallOpClientRecvStatus> + finish_buf; }; /// Async server-side API for handling unary calls, where the single /// response message sent to the client is of type \a W. template <class W> -class ServerAsyncResponseWriter final - : public internal::ServerAsyncStreamingInterface { +class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { public: explicit ServerAsyncResponseWriter(ServerContext* ctx) : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} @@ -297,15 +241,13 @@ class ServerAsyncResponseWriter final } private: - void BindCall(::grpc::internal::Call* call) override { call_ = *call; } + void BindCall(Call* call) override { call_ = *call; } - ::grpc::internal::Call call_; + Call call_; ServerContext* ctx_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> - meta_buf_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpServerSendStatus> + CallOpSet<CallOpSendInitialMetadata> meta_buf_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> finish_buf_; }; @@ -317,6 +259,11 @@ class default_delete<grpc::ClientAsyncResponseReader<R>> { public: void operator()(void* p) {} }; +template <class R> +class default_delete<grpc::ClientAsyncResponseReaderInterface<R>> { + public: + void operator()(void* p) {} +}; } #endif // GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 9bbe7c63f4..0cb11b4cca 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -44,12 +44,10 @@ struct grpc_byte_buffer; namespace grpc { class ByteBuffer; -class CompletionQueue; -extern CoreCodegenInterface* g_core_codegen_interface; - -namespace internal { class Call; class CallHook; +class CompletionQueue; +extern CoreCodegenInterface* g_core_codegen_interface; const char kBinaryErrorDetailsKey[] = "grpc-status-details-bin"; @@ -78,7 +76,6 @@ inline grpc_metadata* FillMetadataArray( } return metadata_array; } -} // namespace internal /// Per-message write options. class WriteOptions { @@ -194,7 +191,6 @@ class WriteOptions { bool last_message_; }; -namespace internal { /// Default argument for CallOpSet. I is unused by the class, but can be /// used for generating multiple names for the same thing. template <int I> @@ -208,12 +204,14 @@ class CallOpSendInitialMetadata { public: CallOpSendInitialMetadata() : send_(false) { maybe_compression_level_.is_set = false; + maybe_stream_compression_level_.is_set = false; } void SendInitialMetadata( const std::multimap<grpc::string, grpc::string>& metadata, uint32_t flags) { maybe_compression_level_.is_set = false; + maybe_stream_compression_level_.is_set = false; send_ = true; flags_ = flags; initial_metadata_ = @@ -225,6 +223,11 @@ class CallOpSendInitialMetadata { maybe_compression_level_.level = level; } + void set_stream_compression_level(grpc_stream_compression_level level) { + maybe_stream_compression_level_.is_set = true; + maybe_stream_compression_level_.level = level; + } + protected: void AddOp(grpc_op* ops, size_t* nops) { if (!send_) return; @@ -240,6 +243,12 @@ class CallOpSendInitialMetadata { op->data.send_initial_metadata.maybe_compression_level.level = maybe_compression_level_.level; } + op->data.send_initial_metadata.maybe_stream_compression_level.is_set = + maybe_stream_compression_level_.is_set; + if (maybe_stream_compression_level_.is_set) { + op->data.send_initial_metadata.maybe_stream_compression_level.level = + maybe_stream_compression_level_.level; + } } void FinishOp(bool* status) { if (!send_) return; @@ -255,6 +264,10 @@ class CallOpSendInitialMetadata { bool is_set; grpc_compression_level level; } maybe_compression_level_; + struct { + bool is_set; + grpc_stream_compression_level level; + } maybe_stream_compression_level_; }; class CallOpSendMessage { @@ -353,6 +366,28 @@ class CallOpRecvMessage { bool allow_not_getting_message_; }; +namespace CallOpGenericRecvMessageHelper { +class DeserializeFunc { + public: + virtual Status Deserialize(grpc_byte_buffer* buf) = 0; + virtual ~DeserializeFunc() {} +}; + +template <class R> +class DeserializeFuncType final : public DeserializeFunc { + public: + DeserializeFuncType(R* message) : message_(message) {} + Status Deserialize(grpc_byte_buffer* buf) override { + return SerializationTraits<R>::Deserialize(buf, message_); + } + + ~DeserializeFuncType() override {} + + private: + R* message_; // Not a managed pointer because management is external to this +}; +} // namespace CallOpGenericRecvMessageHelper + class CallOpGenericRecvMessage { public: CallOpGenericRecvMessage() @@ -360,9 +395,11 @@ class CallOpGenericRecvMessage { template <class R> void RecvMessage(R* message) { - deserialize_ = [message](grpc_byte_buffer* buf) -> Status { - return SerializationTraits<R>::Deserialize(buf, message); - }; + // Use an explicit base class pointer to avoid resolution error in the + // following unique_ptr::reset for some old implementations. + CallOpGenericRecvMessageHelper::DeserializeFunc* func = + new CallOpGenericRecvMessageHelper::DeserializeFuncType<R>(message); + deserialize_.reset(func); } // Do not change status if no message is received. @@ -385,7 +422,7 @@ class CallOpGenericRecvMessage { if (recv_buf_) { if (*status) { got_message = true; - *status = deserialize_(recv_buf_).ok(); + *status = deserialize_->Deserialize(recv_buf_).ok(); } else { got_message = false; g_core_codegen_interface->grpc_byte_buffer_destroy(recv_buf_); @@ -396,12 +433,11 @@ class CallOpGenericRecvMessage { *status = false; } } - deserialize_ = DeserializeFunc(); + deserialize_.reset(); } private: - typedef std::function<Status(grpc_byte_buffer*)> DeserializeFunc; - DeserializeFunc deserialize_; + std::unique_ptr<CallOpGenericRecvMessageHelper::DeserializeFunc> deserialize_; grpc_byte_buffer* recv_buf_; bool allow_not_getting_message_; }; @@ -548,14 +584,6 @@ class CallOpClientRecvStatus { grpc_slice error_message_; }; -/// TODO(vjpai): Remove the existence of CallOpSetCollectionInterface -/// and references to it. This code is deprecated-on-arrival and is -/// only added for users that bypassed the code-generator. -class CallOpSetCollectionInterface { - public: - virtual ~CallOpSetCollectionInterface() {} -}; - /// An abstract collection of call ops, used to generate the /// grpc_call_op structure to pass down to the lower layers, /// and as it is-a CompletionQueueTag, also massages the final @@ -566,18 +594,6 @@ class CallOpSetInterface : public CompletionQueueTag { /// Fills in grpc_op, starting from ops[*nops] and moving /// upwards. virtual void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) = 0; - - /// TODO(vjpai): Remove the SetCollection method and comment. This is only - /// a short-term workaround for users that bypassed the code generator - /// Mark this as belonging to a collection if needed - void SetCollection(std::shared_ptr<CallOpSetCollectionInterface> collection) { - collection_ = collection; - } - - protected: - /// TODO(vjpai): Remove the collection_ field once the idea of bypassing the - /// code generator is forbidden. This is already deprecated - std::shared_ptr<CallOpSetCollectionInterface> collection_; }; /// Primary implementaiton of CallOpSetInterface. @@ -618,13 +634,7 @@ class CallOpSet : public CallOpSetInterface, this->Op6::FinishOp(status); *tag = return_tag_; - // TODO(vjpai): Remove the reference to collection_ once the idea of - // bypassing the code generator is forbidden. It is already deprecated - grpc_call* call = call_; - collection_.reset(); - - g_core_codegen_interface->grpc_call_unref(call); - + g_core_codegen_interface->grpc_call_unref(call_); return true; } @@ -682,7 +692,7 @@ class Call final { grpc_call* call_; int max_receive_message_size_; }; -} // namespace internal + } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_CALL_H diff --git a/include/grpc++/impl/codegen/call_hook.h b/include/grpc++/impl/codegen/call_hook.h index 44e9de220e..d026cc8b58 100644 --- a/include/grpc++/impl/codegen/call_hook.h +++ b/include/grpc++/impl/codegen/call_hook.h @@ -21,7 +21,6 @@ namespace grpc { -namespace internal { class CallOpSetInterface; class Call; @@ -32,7 +31,6 @@ class CallHook { virtual ~CallHook() {} virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; }; -} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/channel_interface.h b/include/grpc++/impl/codegen/channel_interface.h index cf1d77e905..1b7590bf0c 100644 --- a/include/grpc++/impl/codegen/channel_interface.h +++ b/include/grpc++/impl/codegen/channel_interface.h @@ -24,8 +24,10 @@ #include <grpc/impl/codegen/connectivity_state.h> namespace grpc { -class ChannelInterface; +class Call; class ClientContext; +class RpcMethod; +class CallOpSetInterface; class CompletionQueue; template <class R> @@ -43,16 +45,6 @@ class ClientAsyncReaderWriter; template <class R> class ClientAsyncResponseReader; -namespace internal { -class Call; -class CallOpSetInterface; -class RpcMethod; -template <class InputMessage, class OutputMessage> -Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, const InputMessage& request, - OutputMessage* result); -} // namespace internal - /// Codegen interface for \a grpc::Channel. class ChannelInterface { public: @@ -104,16 +96,15 @@ class ChannelInterface { template <class R> friend class ::grpc::ClientAsyncResponseReader; template <class InputMessage, class OutputMessage> - friend Status(::grpc::internal::BlockingUnaryCall)( - ChannelInterface* channel, const internal::RpcMethod& method, - ClientContext* context, const InputMessage& request, - OutputMessage* result); - friend class ::grpc::internal::RpcMethod; - virtual internal::Call CreateCall(const internal::RpcMethod& method, - ClientContext* context, - CompletionQueue* cq) = 0; - virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops, - internal::Call* call) = 0; + friend Status BlockingUnaryCall(ChannelInterface* channel, + const RpcMethod& method, + ClientContext* context, + const InputMessage& request, + OutputMessage* result); + friend class ::grpc::RpcMethod; + virtual Call CreateCall(const RpcMethod& method, ClientContext* context, + CompletionQueue* cq) = 0; + virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; virtual void* RegisterMethod(const char* method) = 0; virtual void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline, @@ -121,6 +112,7 @@ class ChannelInterface { virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline) = 0; }; + } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_CHANNEL_INTERFACE_H diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h index 89eb8b976e..6d7e13bbf2 100644 --- a/include/grpc++/impl/codegen/client_context.h +++ b/include/grpc++/impl/codegen/client_context.h @@ -60,18 +60,7 @@ class Channel; class ChannelInterface; class CompletionQueue; class CallCredentials; -class ClientContext; - -namespace internal { class RpcMethod; -class CallOpClientRecvStatus; -class CallOpRecvInitialMetadata; -template <class InputMessage, class OutputMessage> -Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, const InputMessage& request, - OutputMessage* result); -} // namespace internal - template <class R> class ClientReader; template <class W> @@ -356,8 +345,8 @@ class ClientContext { ClientContext& operator=(const ClientContext&); friend class ::grpc::testing::InteropClientContextInspector; - friend class ::grpc::internal::CallOpClientRecvStatus; - friend class ::grpc::internal::CallOpRecvInitialMetadata; + friend class CallOpClientRecvStatus; + friend class CallOpRecvInitialMetadata; friend class Channel; template <class R> friend class ::grpc::ClientReader; @@ -374,10 +363,11 @@ class ClientContext { template <class R> friend class ::grpc::ClientAsyncResponseReader; template <class InputMessage, class OutputMessage> - friend Status(::grpc::internal::BlockingUnaryCall)( - ChannelInterface* channel, const internal::RpcMethod& method, - ClientContext* context, const InputMessage& request, - OutputMessage* result); + friend Status BlockingUnaryCall(ChannelInterface* channel, + const RpcMethod& method, + ClientContext* context, + const InputMessage& request, + OutputMessage* result); grpc_call* call() const { return call_; } void set_call(grpc_call* call, const std::shared_ptr<Channel>& channel); @@ -409,8 +399,8 @@ class ClientContext { mutable std::shared_ptr<const AuthContext> auth_context_; struct census_context* census_context_; std::multimap<grpc::string, grpc::string> send_initial_metadata_; - internal::MetadataMap recv_initial_metadata_; - internal::MetadataMap trailing_metadata_; + MetadataMap recv_initial_metadata_; + MetadataMap trailing_metadata_; grpc_call* propagate_from_call_; PropagationOptions propagation_options_; diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h index 8fef3ab353..7c540fade9 100644 --- a/include/grpc++/impl/codegen/client_unary_call.h +++ b/include/grpc++/impl/codegen/client_unary_call.h @@ -30,9 +30,8 @@ namespace grpc { class Channel; class ClientContext; class CompletionQueue; - -namespace internal { class RpcMethod; + /// Wrapper that performs a blocking unary call template <class InputMessage, class OutputMessage> Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, @@ -68,7 +67,6 @@ Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, return status; } -} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_CLIENT_UNARY_CALL_H diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index a04778aa72..ca757e2a9c 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -56,19 +56,7 @@ class ServerWriter; namespace internal { template <class W, class R> class ServerReaderWriterBody; -} // namespace internal - -class Channel; -class ChannelInterface; -class ClientContext; -class CompletionQueue; -class Server; -class ServerBuilder; -class ServerContext; - -namespace internal { -class CompletionQueueTag; -class RpcMethod; +} template <class ServiceType, class RequestType, class ResponseType> class RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> @@ -78,13 +66,16 @@ class ServerStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> class BidiStreamingHandler; class UnknownMethodHandler; -template <class Streamer, bool WriteNeeded> -class TemplatedBidiStreamingHandler; -template <class InputMessage, class OutputMessage> -Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, const InputMessage& request, - OutputMessage* result); -} // namespace internal + +class Channel; +class ChannelInterface; +class ClientContext; +class CompletionQueueTag; +class CompletionQueue; +class RpcMethod; +class Server; +class ServerBuilder; +class ServerContext; extern CoreCodegenInterface* g_core_codegen_interface; @@ -205,27 +196,28 @@ class CompletionQueue : private GrpcLibraryCodegen { template <class W, class R> friend class ::grpc::internal::ServerReaderWriterBody; template <class ServiceType, class RequestType, class ResponseType> - friend class ::grpc::internal::RpcMethodHandler; + friend class RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> - friend class ::grpc::internal::ClientStreamingHandler; + friend class ClientStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> - friend class ::grpc::internal::ServerStreamingHandler; + friend class ServerStreamingHandler; template <class Streamer, bool WriteNeeded> - friend class ::grpc::internal::TemplatedBidiStreamingHandler; - friend class ::grpc::internal::UnknownMethodHandler; + friend class TemplatedBidiStreamingHandler; + friend class UnknownMethodHandler; friend class ::grpc::Server; friend class ::grpc::ServerContext; template <class InputMessage, class OutputMessage> - friend Status(::grpc::internal::BlockingUnaryCall)( - ChannelInterface* channel, const internal::RpcMethod& method, - ClientContext* context, const InputMessage& request, - OutputMessage* result); + friend Status BlockingUnaryCall(ChannelInterface* channel, + const RpcMethod& method, + ClientContext* context, + const InputMessage& request, + OutputMessage* result); NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); /// Wraps \a grpc_completion_queue_pluck. /// \warning Must not be mixed with calls to \a Next. - bool Pluck(internal::CompletionQueueTag* tag) { + bool Pluck(CompletionQueueTag* tag) { auto deadline = g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME); auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( @@ -246,7 +238,7 @@ class CompletionQueue : private GrpcLibraryCodegen { /// implementation to simple call the other TryPluck function with a zero /// timeout. i.e: /// TryPluck(tag, gpr_time_0(GPR_CLOCK_REALTIME)) - void TryPluck(internal::CompletionQueueTag* tag) { + void TryPluck(CompletionQueueTag* tag) { auto deadline = g_core_codegen_interface->gpr_time_0(GPR_CLOCK_REALTIME); auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( cq_, tag, deadline, nullptr); @@ -262,7 +254,7 @@ class CompletionQueue : private GrpcLibraryCodegen { /// /// This exects tag->FinalizeResult (if called) to return 'false' i.e expects /// that the tag is internal not something that is returned to the user. - void TryPluck(internal::CompletionQueueTag* tag, gpr_timespec deadline) { + void TryPluck(CompletionQueueTag* tag, gpr_timespec deadline) { auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( cq_, tag, deadline, nullptr); if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) { diff --git a/include/grpc++/impl/codegen/completion_queue_tag.h b/include/grpc++/impl/codegen/completion_queue_tag.h index cb16bcf9ff..4d7d3a98dd 100644 --- a/include/grpc++/impl/codegen/completion_queue_tag.h +++ b/include/grpc++/impl/codegen/completion_queue_tag.h @@ -21,7 +21,6 @@ namespace grpc { -namespace internal { /// An interface allowing implementors to process and filter event tags. class CompletionQueueTag { public: @@ -32,7 +31,6 @@ class CompletionQueueTag { /// queue virtual bool FinalizeResult(void** tag, bool* status) = 0; }; -} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/metadata_map.h b/include/grpc++/impl/codegen/metadata_map.h index fd4750efdd..b73985967d 100644 --- a/include/grpc++/impl/codegen/metadata_map.h +++ b/include/grpc++/impl/codegen/metadata_map.h @@ -23,7 +23,6 @@ namespace grpc { -namespace internal { class MetadataMap { public: MetadataMap() { memset(&arr_, 0, sizeof(arr_)); } @@ -51,7 +50,6 @@ class MetadataMap { grpc_metadata_array arr_; std::multimap<grpc::string_ref, grpc::string_ref> map_; }; -} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 87e9e5e952..15e24bdcdc 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -25,7 +25,6 @@ namespace grpc { -namespace internal { /// A wrapper class of an application provided rpc method handler. template <class ServiceType, class RequestType, class ResponseType> class RpcMethodHandler : public MethodHandler { @@ -266,7 +265,6 @@ class UnknownMethodHandler : public MethodHandler { } }; -} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H diff --git a/include/grpc++/impl/codegen/rpc_method.h b/include/grpc++/impl/codegen/rpc_method.h index 54e52364ef..ac13ac56c7 100644 --- a/include/grpc++/impl/codegen/rpc_method.h +++ b/include/grpc++/impl/codegen/rpc_method.h @@ -24,7 +24,7 @@ #include <grpc++/impl/codegen/channel_interface.h> namespace grpc { -namespace internal { + /// Descriptor of an RPC method class RpcMethod { public: @@ -55,7 +55,6 @@ class RpcMethod { void* const channel_tag_; }; -} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_RPC_METHOD_H diff --git a/include/grpc++/impl/codegen/rpc_service_method.h b/include/grpc++/impl/codegen/rpc_service_method.h index 635e40469b..7165774172 100644 --- a/include/grpc++/impl/codegen/rpc_service_method.h +++ b/include/grpc++/impl/codegen/rpc_service_method.h @@ -35,8 +35,8 @@ struct grpc_byte_buffer; namespace grpc { class ServerContext; +class StreamContextInterface; -namespace internal { /// Base class for running an RPC handler. class MethodHandler { public: @@ -71,7 +71,6 @@ class RpcServiceMethod : public RpcMethod { void* server_tag_; std::unique_ptr<MethodHandler> handler_; }; -} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index a2d6967bf8..b5e37fd12b 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -55,6 +55,7 @@ class ServerWriter; namespace internal { template <class W, class R> class ServerReaderWriterBody; +} template <class ServiceType, class RequestType, class ResponseType> class RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> @@ -64,11 +65,9 @@ class ServerStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> class BidiStreamingHandler; class UnknownMethodHandler; -template <class Streamer, bool WriteNeeded> -class TemplatedBidiStreamingHandler; -class Call; -} // namespace internal +class Call; +class CallOpBuffer; class CompletionQueue; class Server; class ServerInterface; @@ -248,14 +247,14 @@ class ServerContext { template <class W, class R> friend class ::grpc::internal::ServerReaderWriterBody; template <class ServiceType, class RequestType, class ResponseType> - friend class ::grpc::internal::RpcMethodHandler; + friend class RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> - friend class ::grpc::internal::ClientStreamingHandler; + friend class ClientStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> - friend class ::grpc::internal::ServerStreamingHandler; + friend class ServerStreamingHandler; template <class Streamer, bool WriteNeeded> - friend class ::grpc::internal::TemplatedBidiStreamingHandler; - friend class ::grpc::internal::UnknownMethodHandler; + friend class TemplatedBidiStreamingHandler; + friend class UnknownMethodHandler; friend class ::grpc::ClientContext; /// Prevent copying. @@ -264,9 +263,9 @@ class ServerContext { class CompletionOp; - void BeginCompletionOp(internal::Call* call); + void BeginCompletionOp(Call* call); /// Return the tag queued by BeginCompletionOp() - internal::CompletionQueueTag* GetCompletionOpTag(); + CompletionQueueTag* GetCompletionOpTag(); ServerContext(gpr_timespec deadline, grpc_metadata_array* arr); @@ -283,7 +282,7 @@ class ServerContext { CompletionQueue* cq_; bool sent_initial_metadata_; mutable std::shared_ptr<const AuthContext> auth_context_; - internal::MetadataMap client_metadata_; + MetadataMap client_metadata_; std::multimap<grpc::string, grpc::string> initial_metadata_; std::multimap<grpc::string, grpc::string> trailing_metadata_; @@ -291,9 +290,7 @@ class ServerContext { grpc_compression_level compression_level_; grpc_compression_algorithm compression_algorithm_; - internal::CallOpSet<internal::CallOpSendInitialMetadata, - internal::CallOpSendMessage> - pending_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> pending_ops_; bool has_pending_ops_; }; diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h index 3bcf4c87e7..55937f19d7 100644 --- a/include/grpc++/impl/codegen/server_interface.h +++ b/include/grpc++/impl/codegen/server_interface.h @@ -30,21 +30,20 @@ namespace grpc { class AsyncGenericService; class Channel; class GenericServerContext; +class RpcService; +class ServerAsyncStreamingInterface; class ServerCompletionQueue; class ServerContext; class ServerCredentials; class Service; +class ThreadPoolInterface; extern CoreCodegenInterface* g_core_codegen_interface; /// Models a gRPC server. /// /// Servers are configured and started via \a grpc::ServerBuilder. -namespace internal { -class ServerAsyncStreamingInterface; -} // namespace internal - -class ServerInterface : public internal::CallHook { +class ServerInterface : public CallHook { public: virtual ~ServerInterface() {} @@ -79,7 +78,7 @@ class ServerInterface : public internal::CallHook { virtual void Wait() = 0; protected: - friend class ::grpc::Service; + friend class Service; /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the Server instance. @@ -117,13 +116,12 @@ class ServerInterface : public internal::CallHook { virtual grpc_server* server() = 0; - virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops, - internal::Call* call) = 0; + virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; - class BaseAsyncRequest : public internal::CompletionQueueTag { + class BaseAsyncRequest : public CompletionQueueTag { public: BaseAsyncRequest(ServerInterface* server, ServerContext* context, - internal::ServerAsyncStreamingInterface* stream, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, bool delete_on_finalize); virtual ~BaseAsyncRequest(); @@ -133,7 +131,7 @@ class ServerInterface : public internal::CallHook { protected: ServerInterface* const server_; ServerContext* const context_; - internal::ServerAsyncStreamingInterface* const stream_; + ServerAsyncStreamingInterface* const stream_; CompletionQueue* const call_cq_; void* const tag_; const bool delete_on_finalize_; @@ -143,7 +141,7 @@ class ServerInterface : public internal::CallHook { class RegisteredAsyncRequest : public BaseAsyncRequest { public: RegisteredAsyncRequest(ServerInterface* server, ServerContext* context, - internal::ServerAsyncStreamingInterface* stream, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag); // uses BaseAsyncRequest::FinalizeResult @@ -157,7 +155,7 @@ class ServerInterface : public internal::CallHook { public: NoPayloadAsyncRequest(void* registered_method, ServerInterface* server, ServerContext* context, - internal::ServerAsyncStreamingInterface* stream, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) : RegisteredAsyncRequest(server, context, stream, call_cq, tag) { @@ -172,7 +170,7 @@ class ServerInterface : public internal::CallHook { public: PayloadAsyncRequest(void* registered_method, ServerInterface* server, ServerContext* context, - internal::ServerAsyncStreamingInterface* stream, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, Message* request) @@ -214,7 +212,7 @@ class ServerInterface : public internal::CallHook { void* const registered_method_; ServerInterface* const server_; ServerContext* const context_; - internal::ServerAsyncStreamingInterface* const stream_; + ServerAsyncStreamingInterface* const stream_; CompletionQueue* const call_cq_; ServerCompletionQueue* const notification_cq_; void* const tag_; @@ -225,7 +223,7 @@ class ServerInterface : public internal::CallHook { class GenericAsyncRequest : public BaseAsyncRequest { public: GenericAsyncRequest(ServerInterface* server, GenericServerContext* context, - internal::ServerAsyncStreamingInterface* stream, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize); @@ -237,9 +235,8 @@ class ServerInterface : public internal::CallHook { }; template <class Message> - void RequestAsyncCall(internal::RpcServiceMethod* method, - ServerContext* context, - internal::ServerAsyncStreamingInterface* stream, + void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, Message* message) { @@ -249,9 +246,8 @@ class ServerInterface : public internal::CallHook { message); } - void RequestAsyncCall(internal::RpcServiceMethod* method, - ServerContext* context, - internal::ServerAsyncStreamingInterface* stream, + void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { GPR_CODEGEN_ASSERT(method); @@ -260,7 +256,7 @@ class ServerInterface : public internal::CallHook { } void RequestAsyncGenericCall(GenericServerContext* context, - internal::ServerAsyncStreamingInterface* stream, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { diff --git a/include/grpc++/impl/codegen/service_type.h b/include/grpc++/impl/codegen/service_type.h index 71c3d99d5c..2dc4ea0ea6 100644 --- a/include/grpc++/impl/codegen/service_type.h +++ b/include/grpc++/impl/codegen/service_type.h @@ -28,14 +28,13 @@ namespace grpc { +class Call; class CompletionQueue; class Server; class ServerInterface; class ServerCompletionQueue; class ServerContext; -namespace internal { -class Call; class ServerAsyncStreamingInterface { public: virtual ~ServerAsyncStreamingInterface() {} @@ -49,10 +48,9 @@ class ServerAsyncStreamingInterface { virtual void SendInitialMetadata(void* tag) = 0; private: - friend class ::grpc::ServerInterface; + friend class ServerInterface; virtual void BindCall(Call* call) = 0; }; -} // namespace internal /// Desriptor of an RPC service and its various RPC methods class Service { @@ -90,38 +88,40 @@ class Service { protected: template <class Message> void RequestAsyncUnary(int index, ServerContext* context, Message* request, - internal::ServerAsyncStreamingInterface* stream, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag, request); } - void RequestAsyncClientStreaming( - int index, ServerContext* context, - internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag) { + void RequestAsyncClientStreaming(int index, ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag); } template <class Message> - void RequestAsyncServerStreaming( - int index, ServerContext* context, Message* request, - internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag) { + void RequestAsyncServerStreaming(int index, ServerContext* context, + Message* request, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag, request); } - void RequestAsyncBidiStreaming( - int index, ServerContext* context, - internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag) { + void RequestAsyncBidiStreaming(int index, ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag); } - void AddMethod(internal::RpcServiceMethod* method) { - methods_.emplace_back(method); - } + void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); } void MarkMethodAsync(int index) { GPR_CODEGEN_ASSERT( @@ -139,7 +139,7 @@ class Service { methods_[index].reset(); } - void MarkMethodStreamed(int index, internal::MethodHandler* streamed_method) { + void MarkMethodStreamed(int index, MethodHandler* streamed_method) { GPR_CODEGEN_ASSERT(methods_[index] && methods_[index]->handler() && "Cannot mark an async or generic method Streamed"); methods_[index]->SetHandler(streamed_method); @@ -148,14 +148,14 @@ class Service { // case of BIDI_STREAMING that has 1 read and 1 write, in that order, // and split server-side streaming is BIDI_STREAMING with 1 read and // any number of writes, in that order. - methods_[index]->SetMethodType(internal::RpcMethod::BIDI_STREAMING); + methods_[index]->SetMethodType(::grpc::RpcMethod::BIDI_STREAMING); } private: friend class Server; friend class ServerInterface; ServerInterface* server_; - std::vector<std::unique_ptr<internal::RpcServiceMethod>> methods_; + std::vector<std::unique_ptr<RpcServiceMethod>> methods_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index f9c8303000..3fa208963d 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -30,7 +30,6 @@ namespace grpc { -namespace internal { /// Common interface for all synchronous client side streaming. class ClientStreamingInterface { public: @@ -63,6 +62,20 @@ class ClientStreamingInterface { virtual Status Finish() = 0; }; +/// Common interface for all synchronous server side streaming. +class ServerStreamingInterface { + public: + virtual ~ServerStreamingInterface() {} + + /// Block to send initial metadata to client. + /// This call is optional, but if it is used, it cannot be used concurrently + /// with or after the \a Finish method. + /// + /// The initial metadata that will be sent to the client will be + /// taken from the \a ServerContext associated with the call. + virtual void SendInitialMetadata() = 0; +}; + /// An interface that yields a sequence of messages of type \a R. template <class R> class ReaderInterface { @@ -128,55 +141,16 @@ class WriterInterface { } }; -} // namespace internal - /// Client-side interface for streaming reads of message of type \a R. template <class R> -class ClientReaderInterface : public internal::ClientStreamingInterface, - public internal::ReaderInterface<R> { - public: - /// Block to wait for initial metadata from server. The received metadata - /// can only be accessed after this call returns. Should only be called before - /// the first read. Calling this method is optional, and if it is not called - /// the metadata will be available in ClientContext after the first read. - virtual void WaitForInitialMetadata() = 0; -}; - -/// Client-side interface for streaming writes of message type \a W. -template <class W> -class ClientWriterInterface : public internal::ClientStreamingInterface, - public internal::WriterInterface<W> { - public: - /// Half close writing from the client. (signal that the stream of messages - /// coming from the clinet is complete). - /// Blocks until currently-pending writes are completed. - /// Thread safe with respect to \a ReaderInterface::Read operations only - /// - /// \return Whether the writes were successful. - virtual bool WritesDone() = 0; -}; - -/// Client-side interface for bi-directional streaming with -/// client-to-server stream messages of type \a W and -/// server-to-client stream messages of type \a R. -template <class W, class R> -class ClientReaderWriterInterface : public internal::ClientStreamingInterface, - public internal::WriterInterface<W>, - public internal::ReaderInterface<R> { +class ClientReaderInterface : public ClientStreamingInterface, + public ReaderInterface<R> { public: /// Block to wait for initial metadata from server. The received metadata /// can only be accessed after this call returns. Should only be called before /// the first read. Calling this method is optional, and if it is not called /// the metadata will be available in ClientContext after the first read. virtual void WaitForInitialMetadata() = 0; - - /// Half close writing from the client. (signal that the stream of messages - /// coming from the clinet is complete). - /// Blocks until currently-pending writes are completed. - /// Thread-safe with respect to \a ReaderInterface::Read - /// - /// \return Whether the writes were successful. - virtual bool WritesDone() = 0; }; /// Synchronous (blocking) client-side API for doing server-streaming RPCs, @@ -185,14 +159,28 @@ class ClientReaderWriterInterface : public internal::ClientStreamingInterface, template <class R> class ClientReader final : public ClientReaderInterface<R> { public: - struct internal { - template <class W> - static ClientReader* Create(::grpc::ChannelInterface* channel, - const ::grpc::internal::RpcMethod& method, - ClientContext* context, const W& request) { - return new ClientReader(channel, method, context, request); - } - }; + /// Block to create a stream and write the initial metadata and \a request + /// out. Note that \a context will be used to fill in custom initial + /// metadata used to send to the server when starting the call. + template <class W> + ClientReader(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const W& request) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> + ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok()); + ops.ClientSendClose(); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for /// semantics. @@ -204,8 +192,7 @@ class ClientReader final : public ClientReaderInterface<R> { void WaitForInitialMetadata() override { GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> - ops; + CallOpSet<CallOpRecvInitialMetadata> ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); cq_.Pluck(&ops); /// status ignored @@ -222,9 +209,7 @@ class ClientReader final : public ClientReaderInterface<R> { /// already received (if initial metadata is received, it can be then /// accessed through the \a ClientContext associated with this call). bool Read(R* msg) override { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, - ::grpc::internal::CallOpRecvMessage<R>> - ops; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; if (!context_->initial_metadata_received_) { ops.RecvInitialMetadata(context_); } @@ -239,7 +224,7 @@ class ClientReader final : public ClientReaderInterface<R> { /// The \a ClientContext associated with this call is updated with /// possible metadata received from the server. Status Finish() override { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientRecvStatus> ops; + CallOpSet<CallOpClientRecvStatus> ops; Status status; ops.ClientRecvStatus(context_, &status); call_.PerformOps(&ops); @@ -250,48 +235,53 @@ class ClientReader final : public ClientReaderInterface<R> { private: ClientContext* context_; CompletionQueue cq_; - ::grpc::internal::Call call_; + Call call_; +}; - /// Block to create a stream and write the initial metadata and \a request - /// out. Note that \a context will be used to fill in custom initial - /// metadata used to send to the server when starting the call. - template <class W> - ClientReader(::grpc::ChannelInterface* channel, - const ::grpc::internal::RpcMethod& method, - ClientContext* context, const W& request) - : context_(context), - cq_(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, - GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq - call_(channel->CreateCall(method, context, &cq_)) { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpClientSendClose> - ops; - ops.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok()); - ops.ClientSendClose(); - call_.PerformOps(&ops); - cq_.Pluck(&ops); - } +/// Client-side interface for streaming writes of message type \a W. +template <class W> +class ClientWriterInterface : public ClientStreamingInterface, + public WriterInterface<W> { + public: + /// Half close writing from the client. (signal that the stream of messages + /// coming from the clinet is complete). + /// Blocks until currently-pending writes are completed. + /// Thread safe with respect to \a ReaderInterface::Read operations only + /// + /// \return Whether the writes were successful. + virtual bool WritesDone() = 0; }; /// Synchronous (blocking) client-side API for doing client-streaming RPCs, /// where the outgoing message stream coming from the client has messages of /// type \a W. template <class W> -class ClientWriter final : public ClientWriterInterface<W> { +class ClientWriter : public ClientWriterInterface<W> { public: - struct internal { - template <class R> - static ClientWriter* Create(::grpc::ChannelInterface* channel, - const ::grpc::internal::RpcMethod& method, - ClientContext* context, R* response) { - return new ClientWriter(channel, method, context, response); + /// Block to create a stream (i.e. send request headers and other initial + /// metadata to the server). Note that \a context will be used to fill + /// in custom initial metadata. \a response will be filled in with the + /// single expected response message from the server upon a successful + /// call to the \a Finish method of this instance. + template <class R> + ClientWriter(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, R* response) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + finish_ops_.RecvMessage(response); + finish_ops_.AllowNoMessage(); + + if (!context_->initial_metadata_corked_) { + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&ops); + cq_.Pluck(&ops); } - }; + } /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for /// semantics. @@ -302,8 +292,7 @@ class ClientWriter final : public ClientWriterInterface<W> { void WaitForInitialMetadata() { GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> - ops; + CallOpSet<CallOpRecvInitialMetadata> ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); cq_.Pluck(&ops); // status ignored @@ -315,11 +304,10 @@ class ClientWriter final : public ClientWriterInterface<W> { /// Side effect: /// Also sends initial metadata if not already sent (using the /// \a ClientContext associated with this call). - using ::grpc::internal::WriterInterface<W>::Write; + using WriterInterface<W>::Write; bool Write(const W& msg, WriteOptions options) override { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpClientSendClose> + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> ops; if (options.is_last_message()) { @@ -340,7 +328,7 @@ class ClientWriter final : public ClientWriterInterface<W> { } bool WritesDone() override { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; + CallOpSet<CallOpClientSendClose> ops; ops.ClientSendClose(); call_.PerformOps(&ops); return cq_.Pluck(&ops); @@ -365,55 +353,61 @@ class ClientWriter final : public ClientWriterInterface<W> { private: ClientContext* context_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, - ::grpc::internal::CallOpGenericRecvMessage, - ::grpc::internal::CallOpClientRecvStatus> + CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, + CallOpClientRecvStatus> finish_ops_; CompletionQueue cq_; - ::grpc::internal::Call call_; + Call call_; +}; - /// Block to create a stream (i.e. send request headers and other initial - /// metadata to the server). Note that \a context will be used to fill - /// in custom initial metadata. \a response will be filled in with the - /// single expected response message from the server upon a successful - /// call to the \a Finish method of this instance. - template <class R> - ClientWriter(::grpc::ChannelInterface* channel, - const ::grpc::internal::RpcMethod& method, - ClientContext* context, R* response) +/// Client-side interface for bi-directional streaming with +/// client-to-server stream messages of type \a W and +/// server-to-client stream messages of type \a R. +template <class W, class R> +class ClientReaderWriterInterface : public ClientStreamingInterface, + public WriterInterface<W>, + public ReaderInterface<R> { + public: + /// Block to wait for initial metadata from server. The received metadata + /// can only be accessed after this call returns. Should only be called before + /// the first read. Calling this method is optional, and if it is not called + /// the metadata will be available in ClientContext after the first read. + virtual void WaitForInitialMetadata() = 0; + + /// Half close writing from the client. (signal that the stream of messages + /// coming from the clinet is complete). + /// Blocks until currently-pending writes are completed. + /// Thread-safe with respect to \a ReaderInterface::Read + /// + /// \return Whether the writes were successful. + virtual bool WritesDone() = 0; +}; + +/// Synchronous (blocking) client-side API for bi-directional streaming RPCs, +/// where the outgoing message stream coming from the client has messages of +/// type \a W, and the incoming messages stream coming from the server has +/// messages of type \a R. +template <class W, class R> +class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { + public: + /// Block to create a stream and write the initial metadata and \a request + /// out. Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context) : context_(context), cq_(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq call_(channel->CreateCall(method, context, &cq_)) { - finish_ops_.RecvMessage(response); - finish_ops_.AllowNoMessage(); - if (!context_->initial_metadata_corked_) { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> - ops; + CallOpSet<CallOpSendInitialMetadata> ops; ops.SendInitialMetadata(context->send_initial_metadata_, context->initial_metadata_flags()); call_.PerformOps(&ops); cq_.Pluck(&ops); } } -}; - -/// Synchronous (blocking) client-side API for bi-directional streaming RPCs, -/// where the outgoing message stream coming from the client has messages of -/// type \a W, and the incoming messages stream coming from the server has -/// messages of type \a R. -template <class W, class R> -class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { - public: - struct internal { - static ClientReaderWriter* Create(::grpc::ChannelInterface* channel, - const ::grpc::internal::RpcMethod& method, - ClientContext* context) { - return new ClientReaderWriter(channel, method, context); - } - }; /// Block waiting to read initial metadata from the server. /// This call is optional, but if it is used, it cannot be used concurrently @@ -424,8 +418,7 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { void WaitForInitialMetadata() override { GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> - ops; + CallOpSet<CallOpRecvInitialMetadata> ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); cq_.Pluck(&ops); // status ignored @@ -441,9 +434,7 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { /// Also receives initial metadata if not already received (updates the \a /// ClientContext associated with this call in that case). bool Read(R* msg) override { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, - ::grpc::internal::CallOpRecvMessage<R>> - ops; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; if (!context_->initial_metadata_received_) { ops.RecvInitialMetadata(context_); } @@ -457,11 +448,10 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { /// Side effect: /// Also sends initial metadata if not already sent (using the /// \a ClientContext associated with this call to fill in values). - using ::grpc::internal::WriterInterface<W>::Write; + using WriterInterface<W>::Write; bool Write(const W& msg, WriteOptions options) override { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpClientSendClose> + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> ops; if (options.is_last_message()) { @@ -482,7 +472,7 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { } bool WritesDone() override { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; + CallOpSet<CallOpClientSendClose> ops; ops.ClientSendClose(); call_.PerformOps(&ops); return cq_.Pluck(&ops); @@ -494,9 +484,7 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { /// - the \a ClientContext associated with this call is updated with /// possible trailing metadata sent from the server. Status Finish() override { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, - ::grpc::internal::CallOpClientRecvStatus> - ops; + CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> ops; if (!context_->initial_metadata_received_) { ops.RecvInitialMetadata(context_); } @@ -510,61 +498,13 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { private: ClientContext* context_; CompletionQueue cq_; - ::grpc::internal::Call call_; - - /// Block to create a stream and write the initial metadata and \a request - /// out. Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - ClientReaderWriter(::grpc::ChannelInterface* channel, - const ::grpc::internal::RpcMethod& method, - ClientContext* context) - : context_(context), - cq_(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, - GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq - call_(channel->CreateCall(method, context, &cq_)) { - if (!context_->initial_metadata_corked_) { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> - ops; - ops.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&ops); - cq_.Pluck(&ops); - } - } + Call call_; }; -namespace internal { -/// Common interface for all synchronous server side streaming. -class ServerStreamingInterface { - public: - virtual ~ServerStreamingInterface() {} - - /// Block to send initial metadata to client. - /// This call is optional, but if it is used, it cannot be used concurrently - /// with or after the \a Finish method. - /// - /// The initial metadata that will be sent to the client will be - /// taken from the \a ServerContext associated with the call. - virtual void SendInitialMetadata() = 0; -}; -} // namespace internal - /// Server-side interface for streaming reads of message of type \a R. template <class R> -class ServerReaderInterface : public internal::ServerStreamingInterface, - public internal::ReaderInterface<R> {}; - -/// Server-side interface for streaming writes of message of type \a W. -template <class W> -class ServerWriterInterface : public internal::ServerStreamingInterface, - public internal::WriterInterface<W> {}; - -/// Server-side interface for bi-directional streaming. -template <class W, class R> -class ServerReaderWriterInterface : public internal::ServerStreamingInterface, - public internal::WriterInterface<W>, - public internal::ReaderInterface<R> {}; +class ServerReaderInterface : public ServerStreamingInterface, + public ReaderInterface<R> {}; /// Synchronous (blocking) server-side API for doing client-streaming RPCs, /// where the incoming message stream coming from the client has messages of @@ -572,13 +512,15 @@ class ServerReaderWriterInterface : public internal::ServerStreamingInterface, template <class R> class ServerReader final : public ServerReaderInterface<R> { public: + ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + /// See the \a ServerStreamingInterface.SendInitialMetadata method /// for semantics. Note that initial metadata will be affected by the /// \a ServerContext associated with this call. void SendInitialMetadata() override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - internal::CallOpSet<internal::CallOpSendInitialMetadata> ops; + CallOpSet<CallOpSendInitialMetadata> ops; ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { @@ -595,29 +537,30 @@ class ServerReader final : public ServerReaderInterface<R> { } bool Read(R* msg) override { - internal::CallOpSet<internal::CallOpRecvMessage<R>> ops; + CallOpSet<CallOpRecvMessage<R>> ops; ops.RecvMessage(msg); call_->PerformOps(&ops); return call_->cq()->Pluck(&ops) && ops.got_message; } private: - internal::Call* const call_; + Call* const call_; ServerContext* const ctx_; - - template <class ServiceType, class RequestType, class ResponseType> - friend class internal::ClientStreamingHandler; - - ServerReader(internal::Call* call, ServerContext* ctx) - : call_(call), ctx_(ctx) {} }; +/// Server-side interface for streaming writes of message of type \a W. +template <class W> +class ServerWriterInterface : public ServerStreamingInterface, + public WriterInterface<W> {}; + /// Synchronous (blocking) server-side API for doing for doing a /// server-streaming RPCs, where the outgoing message stream coming from the /// server has messages of type \a W. template <class W> class ServerWriter final : public ServerWriterInterface<W> { public: + ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + /// See the \a ServerStreamingInterface.SendInitialMetadata method /// for semantics. /// Note that initial metadata will be affected by the @@ -625,7 +568,7 @@ class ServerWriter final : public ServerWriterInterface<W> { void SendInitialMetadata() override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - internal::CallOpSet<internal::CallOpSendInitialMetadata> ops; + CallOpSet<CallOpSendInitialMetadata> ops; ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { @@ -641,12 +584,11 @@ class ServerWriter final : public ServerWriterInterface<W> { /// Side effect: /// Also sends initial metadata if not already sent (using the /// \a ClientContext associated with this call to fill in values). - using internal::WriterInterface<W>::Write; + using WriterInterface<W>::Write; bool Write(const W& msg, WriteOptions options) override { if (options.is_last_message()) { options.set_buffer_hint(); } - if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) { return false; } @@ -671,16 +613,16 @@ class ServerWriter final : public ServerWriterInterface<W> { } private: - internal::Call* const call_; + Call* const call_; ServerContext* const ctx_; - - template <class ServiceType, class RequestType, class ResponseType> - friend class internal::ServerStreamingHandler; - - ServerWriter(internal::Call* call, ServerContext* ctx) - : call_(call), ctx_(ctx) {} }; +/// Server-side interface for bi-directional streaming. +template <class W, class R> +class ServerReaderWriterInterface : public ServerStreamingInterface, + public WriterInterface<W>, + public ReaderInterface<R> {}; + /// Actual implementation of bi-directional streaming namespace internal { template <class W, class R> @@ -746,7 +688,6 @@ class ServerReaderWriterBody final { Call* const call_; ServerContext* const ctx_; }; - } // namespace internal /// Synchronous (blocking) server-side API for a bidirectional @@ -756,6 +697,8 @@ class ServerReaderWriterBody final { template <class W, class R> class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> { public: + ServerReaderWriter(Call* call, ServerContext* ctx) : body_(call, ctx) {} + /// See the \a ServerStreamingInterface.SendInitialMetadata method /// for semantics. Note that initial metadata will be affected by the /// \a ServerContext associated with this call. @@ -772,18 +715,13 @@ class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> { /// Side effect: /// Also sends initial metadata if not already sent (using the \a /// ServerContext associated with this call). - using internal::WriterInterface<W>::Write; + using WriterInterface<W>::Write; bool Write(const W& msg, WriteOptions options) override { return body_.Write(msg, options); } private: internal::ServerReaderWriterBody<W, R> body_; - - friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>, - false>; - ServerReaderWriter(internal::Call* call, ServerContext* ctx) - : body_(call, ctx) {} }; /// A class to represent a flow-controlled unary call. This is something @@ -798,6 +736,9 @@ template <class RequestType, class ResponseType> class ServerUnaryStreamer final : public ServerReaderWriterInterface<ResponseType, RequestType> { public: + ServerUnaryStreamer(Call* call, ServerContext* ctx) + : body_(call, ctx), read_done_(false), write_done_(false) {} + /// Block to send initial metadata to client. /// Implicit input parameter: /// - the \a ServerContext associated with this call will be used for @@ -834,7 +775,7 @@ class ServerUnaryStreamer final /// \param options The WriteOptions affecting the write operation. /// /// \return \a true on success, \a false when the stream has been closed. - using internal::WriterInterface<ResponseType>::Write; + using WriterInterface<ResponseType>::Write; bool Write(const ResponseType& response, WriteOptions options) override { if (write_done_ || !read_done_) { return false; @@ -847,11 +788,6 @@ class ServerUnaryStreamer final internal::ServerReaderWriterBody<ResponseType, RequestType> body_; bool read_done_; bool write_done_; - - friend class internal::TemplatedBidiStreamingHandler< - ServerUnaryStreamer<RequestType, ResponseType>, true>; - ServerUnaryStreamer(internal::Call* call, ServerContext* ctx) - : body_(call, ctx), read_done_(false), write_done_(false) {} }; /// A class to represent a flow-controlled server-side streaming call. @@ -863,6 +799,9 @@ template <class RequestType, class ResponseType> class ServerSplitStreamer final : public ServerReaderWriterInterface<ResponseType, RequestType> { public: + ServerSplitStreamer(Call* call, ServerContext* ctx) + : body_(call, ctx), read_done_(false) {} + /// Block to send initial metadata to client. /// Implicit input parameter: /// - the \a ServerContext associated with this call will be used for @@ -899,7 +838,7 @@ class ServerSplitStreamer final /// \param options The WriteOptions affecting the write operation. /// /// \return \a true on success, \a false when the stream has been closed. - using internal::WriterInterface<ResponseType>::Write; + using WriterInterface<ResponseType>::Write; bool Write(const ResponseType& response, WriteOptions options) override { return read_done_ && body_.Write(response, options); } @@ -907,11 +846,6 @@ class ServerSplitStreamer final private: internal::ServerReaderWriterBody<ResponseType, RequestType> body_; bool read_done_; - - friend class internal::TemplatedBidiStreamingHandler< - ServerSplitStreamer<RequestType, ResponseType>, false>; - ServerSplitStreamer(internal::Call* call, ServerContext* ctx) - : body_(call, ctx), read_done_(false) {} }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/time.h b/include/grpc++/impl/codegen/time.h index d464d6ea13..589deb4f03 100644 --- a/include/grpc++/impl/codegen/time.h +++ b/include/grpc++/impl/codegen/time.h @@ -19,8 +19,6 @@ #ifndef GRPCXX_IMPL_CODEGEN_TIME_H #define GRPCXX_IMPL_CODEGEN_TIME_H -#include <chrono> - #include <grpc++/impl/codegen/config.h> #include <grpc/impl/codegen/grpc_types.h> @@ -61,6 +59,10 @@ class TimePoint<gpr_timespec> { } // namespace grpc +#include <chrono> + +#include <grpc/impl/codegen/grpc_types.h> + namespace grpc { // from and to should be absolute time. diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 4684b10fe6..0a3aae8241 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -159,7 +159,7 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { /// /// \param addr The address to try to bind to the server (eg, localhost:1234, /// 192.168.1.1:31416, [::1]:27182, etc.). - /// \params creds The credentials associated with the server. + /// \param creds The credentials associated with the server. /// /// \return bound port number on success, 0 on failure. /// @@ -175,8 +175,7 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { /// \param num_cqs How many completion queues does \a cqs hold. void Start(ServerCompletionQueue** cqs, size_t num_cqs) override; - void PerformOpsOnCall(internal::CallOpSetInterface* ops, - internal::Call* call) override; + void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) override; void ShutdownInternal(gpr_timespec deadline) override; diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 5bd53bd90f..eafd63619d 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -40,6 +40,7 @@ namespace grpc { class AsyncGenericService; class ResourceQuota; class CompletionQueue; +class RpcService; class Server; class ServerCompletionQueue; class ServerCredentials; diff --git a/include/grpc++/support/slice.h b/include/grpc++/support/slice.h index 0b4ba7cecb..bbf97f280e 100644 --- a/include/grpc++/support/slice.h +++ b/include/grpc++/support/slice.h @@ -67,6 +67,20 @@ class Slice final { return *this; } + /// Create a slice pointing at some data. Calls malloc to allocate a refcount + /// for the object, and arranges that destroy will be called with the + /// user data pointer passed in at destruction. Can be the same as buf or + /// different (e.g., if data is part of a larger structure that must be + /// destroyed when the data is no longer needed) + Slice(void* buf, size_t len, void (*destroy)(void*), void* user_data); + + /// Specialization of above for common case where buf == user_data + Slice(void* buf, size_t len, void (*destroy)(void*)) + : Slice(buf, len, destroy, buf) {} + + /// Similar to the above but has a destroy that also takes slice length + Slice(void* buf, size_t len, void (*destroy)(void*, size_t)); + /// Byte size. size_t size() const { return GRPC_SLICE_LENGTH(slice_); } diff --git a/include/grpc/compression.h b/include/grpc/compression.h index d47074c9b7..3a8de4b7b8 100644 --- a/include/grpc/compression.h +++ b/include/grpc/compression.h @@ -30,25 +30,42 @@ extern "C" { #endif -/** Parses the first \a name_length bytes of \a name as a - * grpc_compression_algorithm instance, updating \a algorithm. Returns 1 upon - * success, 0 otherwise. */ +/** Parses the \a slice as a grpc_compression_algorithm instance and updating \a + * algorithm. Returns 1 upon success, 0 otherwise. */ GRPCAPI int grpc_compression_algorithm_parse( grpc_slice value, grpc_compression_algorithm *algorithm); +/** Parses the \a slice as a grpc_stream_compression_algorithm instance and + * updating \a algorithm. Returns 1 upon success, 0 otherwise. */ +int grpc_stream_compression_algorithm_parse( + grpc_slice name, grpc_stream_compression_algorithm *algorithm); + /** Updates \a name with the encoding name corresponding to a valid \a * algorithm. Note that \a name is statically allocated and must *not* be freed. * Returns 1 upon success, 0 otherwise. */ GRPCAPI int grpc_compression_algorithm_name( grpc_compression_algorithm algorithm, char **name); +/** Updates \a name with the encoding name corresponding to a valid \a + * algorithm. Note that \a name is statically allocated and must *not* be freed. + * Returns 1 upon success, 0 otherwise. */ +GRPCAPI int grpc_stream_compression_algorithm_name( + grpc_stream_compression_algorithm algorithm, char **name); + /** Returns the compression algorithm corresponding to \a level for the * compression algorithms encoded in the \a accepted_encodings bitset. * - * It abort()s for unknown levels . */ + * It abort()s for unknown levels. */ GRPCAPI grpc_compression_algorithm grpc_compression_algorithm_for_level( grpc_compression_level level, uint32_t accepted_encodings); +/** Returns the stream compression algorithm corresponding to \a level for the + * compression algorithms encoded in the \a accepted_stream_encodings bitset. + * It abort()s for unknown levels. */ +GRPCAPI grpc_stream_compression_algorithm +grpc_stream_compression_algorithm_for_level(grpc_stream_compression_level level, + uint32_t accepted_stream_encodings); + GRPCAPI void grpc_compression_options_init(grpc_compression_options *opts); /** Mark \a algorithm as enabled in \a opts. */ @@ -63,6 +80,11 @@ GRPCAPI void grpc_compression_options_disable_algorithm( GRPCAPI int grpc_compression_options_is_algorithm_enabled( const grpc_compression_options *opts, grpc_compression_algorithm algorithm); +/** Returns true if \a algorithm is marked as enabled in \a opts. */ +GRPCAPI int grpc_compression_options_is_stream_compression_algorithm_enabled( + const grpc_compression_options *opts, + grpc_stream_compression_algorithm algorithm); + #ifdef __cplusplus } #endif diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 2cf8de0a2d..943d6e4891 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -296,7 +296,11 @@ GRPCAPI grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved); If a status has not been received for the call, set it to the status code and description passed in. Importantly, this function does not send status nor description to the - remote endpoint. */ + remote endpoint. + Note that \a description doesn't need be a static string. + It doesn't need to be alive after the call to + grpc_call_cancel_with_status completes. + */ GRPCAPI grpc_call_error grpc_call_cancel_with_status(grpc_call *call, grpc_status_code status, const char *description, diff --git a/include/grpc/impl/codegen/atm.h b/include/grpc/impl/codegen/atm.h index c3b528be4d..2cfd22ab63 100644 --- a/include/grpc/impl/codegen/atm.h +++ b/include/grpc/impl/codegen/atm.h @@ -60,6 +60,7 @@ int gpr_atm_no_barrier_cas(gpr_atm *p, gpr_atm o, gpr_atm n); int gpr_atm_acq_cas(gpr_atm *p, gpr_atm o, gpr_atm n); int gpr_atm_rel_cas(gpr_atm *p, gpr_atm o, gpr_atm n); + int gpr_atm_full_cas(gpr_atm *p, gpr_atm o, gpr_atm n); // Atomically, set *p=n and return the old value of *p gpr_atm gpr_atm_full_xchg(gpr_atm *p, gpr_atm n); diff --git a/include/grpc/impl/codegen/byte_buffer_reader.h b/include/grpc/impl/codegen/byte_buffer_reader.h index 2ae3f1e20e..dc0f15496f 100644 --- a/include/grpc/impl/codegen/byte_buffer_reader.h +++ b/include/grpc/impl/codegen/byte_buffer_reader.h @@ -29,7 +29,7 @@ struct grpc_byte_buffer_reader { struct grpc_byte_buffer *buffer_in; struct grpc_byte_buffer *buffer_out; /** Different current objects correspond to different types of byte buffers */ - union { + union grpc_byte_buffer_reader_current { /** Index into a slice buffer's array of slices */ unsigned index; } current; diff --git a/include/grpc/impl/codegen/compression_types.h b/include/grpc/impl/codegen/compression_types.h index e39c13e88d..4419e2a447 100644 --- a/include/grpc/impl/codegen/compression_types.h +++ b/include/grpc/impl/codegen/compression_types.h @@ -29,6 +29,11 @@ extern "C" { * algorithm */ #define GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY \ "grpc-internal-encoding-request" +/** To be used as initial metadata key for the request of a concrete stream + * compression + * algorithm */ +#define GRPC_STREAM_COMPRESSION_REQUEST_ALGORITHM_MD_KEY \ + "grpc-internal-stream-encoding-request" /** To be used in channel arguments. * @@ -38,9 +43,17 @@ extern "C" { * Its value is an int from the \a grpc_compression_algorithm enum. */ #define GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM \ "grpc.default_compression_algorithm" +/** Default stream compression algorithm for the channel. + * Its value is an int from the \a grpc_stream_compression_algorithm enum. */ +#define GRPC_STREAM_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM \ + "grpc.default_stream_compression_algorithm" /** Default compression level for the channel. * Its value is an int from the \a grpc_compression_level enum. */ #define GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL "grpc.default_compression_level" +/** Default stream compression level for the channel. + * Its value is an int from the \a grpc_stream_compression_level enum. */ +#define GRPC_STREAM_COMPRESSION_CHANNEL_DEFAULT_LEVEL \ + "grpc.default_stream_compression_level" /** Compression algorithms supported by the channel. * Its value is a bitset (an int). Bits correspond to algorithms in \a * grpc_compression_algorithm. For example, its LSB corresponds to @@ -50,6 +63,15 @@ extern "C" { * be ignored). */ #define GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET \ "grpc.compression_enabled_algorithms_bitset" +/** Stream compression algorithms supported by the channel. + * Its value is a bitset (an int). Bits correspond to algorithms in \a + * grpc_stream_compression_algorithm. For example, its LSB corresponds to + * GRPC_STREAM_COMPRESS_NONE, the next bit to GRPC_STREAM_COMPRESS_DEFLATE, etc. + * Unset bits disable support for the algorithm. By default all algorithms are + * supported. It's not possible to disable GRPC_STREAM_COMPRESS_NONE (the + * attempt will be ignored). */ +#define GRPC_STREAM_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET \ + "grpc.stream_compression_enabled_algorithms_bitset" /** \} */ /** The various compression algorithms supported by gRPC */ @@ -61,6 +83,13 @@ typedef enum { GRPC_COMPRESS_ALGORITHMS_COUNT } grpc_compression_algorithm; +/** Stream compresssion algorithms supported by gRPC */ +typedef enum { + GRPC_STREAM_COMPRESS_NONE = 0, + GRPC_STREAM_COMPRESS_GZIP, + GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT +} grpc_stream_compression_algorithm; + /** Compression levels allow a party with knowledge of its peer's accepted * encodings to request compression in an abstract way. The level-algorithm * mapping is performed internally and depends on the peer's supported @@ -73,30 +102,59 @@ typedef enum { GRPC_COMPRESS_LEVEL_COUNT } grpc_compression_level; +/** Compression levels for stream compression algorithms */ +typedef enum { + GRPC_STREAM_COMPRESS_LEVEL_NONE = 0, + GRPC_STREAM_COMPRESS_LEVEL_LOW, + GRPC_STREAM_COMPRESS_LEVEL_MED, + GRPC_STREAM_COMPRESS_LEVEL_HIGH, + GRPC_STREAM_COMPRESS_LEVEL_COUNT +} grpc_stream_compression_level; + typedef struct grpc_compression_options { /** All algs are enabled by default. This option corresponds to the channel * argument key behind \a GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET */ uint32_t enabled_algorithms_bitset; + uint32_t enabled_stream_compression_algorithms_bitset; - /** The default channel compression level. It'll be used in the absence of - * call specific settings. This option corresponds to the channel argument key - * behind \a GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL. If present, takes - * precedence over \a default_algorithm. + /** The default message-wise compression level. It'll be used in the absence + * of * call specific settings. This option corresponds to the channel + * argument key behind \a GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL. If present, + * takes precedence over \a default_algorithm and \a + * default_stream_compression_algorithm. * TODO(dgq): currently only available for server channels. */ - struct { + struct grpc_compression_options_default_level { int is_set; grpc_compression_level level; } default_level; - /** The default channel compression algorithm. It'll be used in the absence of + /** The default stream compression level. It'll be used in the absence of call + * specefic settings. If present, takes precedence over \a default_level, + * \a default_algorithm and \a default_stream_compression_algorithm. */ + struct grpc_stream_compression_options_default_level { + int is_set; + grpc_stream_compression_level level; + } default_stream_compression_level; + + /** The default message compression algorithm. It'll be used in the absence of * call specific settings. This option corresponds to the channel argument key * behind \a GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM. */ - struct { + struct grpc_compression_options_default_algorithm { int is_set; grpc_compression_algorithm algorithm; } default_algorithm; + /** The default stream compression algorithm. It'll be used in the absence of + * call specific settings. If present, takes precedence over \a + * default_algorithm. This option corresponds to the channel + * argument key behind \a GRPC_STREAM_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM. + */ + struct grpc_stream_compression_options_default_algorithm { + int is_set; + grpc_stream_compression_algorithm algorithm; + } default_stream_compression_algorithm; + } grpc_compression_options; #ifdef __cplusplus diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index d2950e168e..6955060ddd 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -41,11 +41,11 @@ typedef enum { typedef struct grpc_byte_buffer { void *reserved; grpc_byte_buffer_type type; - union { - struct { + union grpc_byte_buffer_data { + struct /* internal */ { void *reserved[8]; } reserved; - struct { + struct grpc_compressed_buffer { grpc_compression_algorithm compression; grpc_slice_buffer slice_buffer; } raw; @@ -104,10 +104,10 @@ typedef struct grpc_arg_pointer_vtable { typedef struct { grpc_arg_type type; char *key; - union { + union grpc_arg_value { char *string; int integer; - struct { + struct grpc_arg_pointer { void *p; const grpc_arg_pointer_vtable *vtable; } pointer; @@ -258,8 +258,12 @@ typedef struct { #define GRPC_ARG_RESOURCE_QUOTA "grpc.resource_quota" /** If non-zero, expand wildcard addresses to a list of local addresses. */ #define GRPC_ARG_EXPAND_WILDCARD_ADDRS "grpc.expand_wildcard_addrs" -/** Service config data in JSON form. Not intended for use outside of tests. */ +/** Service config data in JSON form. + This value will be ignored if the name resolver returns a service config. */ #define GRPC_ARG_SERVICE_CONFIG "grpc.service_config" +/** Disable looking up the service config via the name resolver. */ +#define GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION \ + "grpc.service_config_disable_resolution" /** LB policy name. */ #define GRPC_ARG_LB_POLICY_NAME "grpc.lb_policy_name" /** The grpc_socket_mutator instance that set the socket options. A pointer. */ @@ -333,7 +337,9 @@ typedef enum grpc_call_error { /** this batch of operations leads to more operations than allowed */ GRPC_CALL_ERROR_BATCH_TOO_BIG, /** payload type requested is not the type registered */ - GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH + GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH, + /** completion queue has been shutdown */ + GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN } grpc_call_error; /** Default send/receive message size limits in bytes. -1 for unlimited. */ @@ -388,7 +394,7 @@ typedef struct grpc_metadata { /** The following fields are reserved for grpc internal use. There is no need to initialize them, and they will be set to garbage during calls to grpc. */ - struct { + struct /* internal */ { void *obfuscated[4]; } internal_data; } grpc_metadata; @@ -488,25 +494,29 @@ typedef struct grpc_op { uint32_t flags; /** Reserved for future usage */ void *reserved; - union { + union grpc_op_data { /** Reserved for future usage */ - struct { + struct /* internal */ { void *reserved[8]; } reserved; - struct { + struct grpc_op_send_initial_metadata { size_t count; grpc_metadata *metadata; /** If \a is_set, \a compression_level will be used for the call. * Otherwise, \a compression_level won't be considered */ - struct { + struct grpc_op_send_initial_metadata_maybe_compression_level { uint8_t is_set; grpc_compression_level level; } maybe_compression_level; + struct grpc_op_send_initial_metadata_maybe_stream_compression_level { + uint8_t is_set; + grpc_stream_compression_level level; + } maybe_stream_compression_level; } send_initial_metadata; - struct { + struct grpc_op_send_message { struct grpc_byte_buffer *send_message; } send_message; - struct { + struct grpc_op_send_status_from_server { size_t trailing_metadata_count; grpc_metadata *trailing_metadata; grpc_status_code status; @@ -520,16 +530,16 @@ typedef struct grpc_op { object, recv_initial_metadata->array is owned by the caller). After the operation completes, call grpc_metadata_array_destroy on this value, or reuse it in a future op. */ - struct { + struct grpc_op_recv_initial_metadata { grpc_metadata_array *recv_initial_metadata; } recv_initial_metadata; /** ownership of the byte buffer is moved to the caller; the caller must call grpc_byte_buffer_destroy on this value, or reuse it in a future op. */ - struct { + struct grpc_op_recv_message { struct grpc_byte_buffer **recv_message; } recv_message; - struct { + struct grpc_op_recv_status_on_client { /** ownership of the array is with the caller, but ownership of the elements stays with the call object (ie key, value members are owned by the call object, trailing_metadata->array is owned by the caller). @@ -539,7 +549,7 @@ typedef struct grpc_op { grpc_status_code *status; grpc_slice *status_details; } recv_status_on_client; - struct { + struct grpc_op_recv_close_on_server { /** out argument, set to 1 if the call failed in any way (seen as a cancellation on the server), or 0 if the call succeeded */ int *cancelled; diff --git a/include/grpc/impl/codegen/slice.h b/include/grpc/impl/codegen/slice.h index 5ec439eb37..a04c683a55 100644 --- a/include/grpc/impl/codegen/slice.h +++ b/include/grpc/impl/codegen/slice.h @@ -75,12 +75,12 @@ typedef struct grpc_slice_refcount { of data that is copied by value. */ struct grpc_slice { struct grpc_slice_refcount *refcount; - union { - struct { + union grpc_slice_data { + struct grpc_slice_refcounted { uint8_t *bytes; size_t length; } refcounted; - struct { + struct grpc_slice_inlined { uint8_t length; uint8_t bytes[GRPC_SLICE_INLINED_SIZE]; } inlined; diff --git a/include/grpc/impl/codegen/sync.h b/include/grpc/impl/codegen/sync.h index de4e99be63..6cdb0c5153 100644 --- a/include/grpc/impl/codegen/sync.h +++ b/include/grpc/impl/codegen/sync.h @@ -49,7 +49,9 @@ extern "C" { #include <grpc/impl/codegen/sync_posix.h> #elif defined(GPR_WINDOWS) #include <grpc/impl/codegen/sync_windows.h> -#elif !defined(GPR_CUSTOM_SYNC) +#elif defined(GPR_CUSTOM_SYNC) +#include <grpc/impl/codegen/sync_custom.h> +#else #error Unable to determine platform for sync #endif diff --git a/include/grpc/impl/codegen/sync_custom.h b/include/grpc/impl/codegen/sync_custom.h new file mode 100644 index 0000000000..0840ad26bf --- /dev/null +++ b/include/grpc/impl/codegen/sync_custom.h @@ -0,0 +1,36 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_IMPL_CODEGEN_SYNC_CUSTOM_H +#define GRPC_IMPL_CODEGEN_SYNC_CUSTOM_H + +#include <grpc/impl/codegen/sync_generic.h> + +/* Users defining GPR_CUSTOM_SYNC need to define the following macros. */ + +#ifdef GPR_CUSTOM_SYNC + +typedef GPR_CUSTOM_MU_TYPE gpr_mu; +typedef GPR_CUSTOM_CV_TYPE gpr_cv; +typedef GPR_CUSTOM_ONCE_TYPE gpr_once; + +#define GPR_ONCE_INIT GPR_CUSTOM_ONCE_INIT + +#endif + +#endif /* GRPC_IMPL_CODEGEN_SYNC_CUSTOM_H */ diff --git a/include/grpc/support/avl.h b/include/grpc/support/avl.h index ed052e8222..d53ff5d904 100644 --- a/include/grpc/support/avl.h +++ b/include/grpc/support/avl.h @@ -31,18 +31,23 @@ typedef struct gpr_avl_node { long height; } gpr_avl_node; +/** vtable for the AVL tree + * The optional user_data is propagated from the top level gpr_avl_XXX API. + * From the same API call, multiple vtable functions may be called multiple + * times. + */ typedef struct gpr_avl_vtable { /** destroy a key */ - void (*destroy_key)(void *key); + void (*destroy_key)(void *key, void *user_data); /** copy a key, returning new value */ - void *(*copy_key)(void *key); + void *(*copy_key)(void *key, void *user_data); /** compare key1, key2; return <0 if key1 < key2, >0 if key1 > key2, 0 if key1 == key2 */ - long (*compare_keys)(void *key1, void *key2); + long (*compare_keys)(void *key1, void *key2, void *user_data); /** destroy a value */ - void (*destroy_value)(void *value); + void (*destroy_value)(void *value, void *user_data); /** copy a value */ - void *(*copy_value)(void *value); + void *(*copy_value)(void *value, void *user_data); } gpr_avl_vtable; /** "pointer" to an AVL tree - this is a reference @@ -53,29 +58,36 @@ typedef struct gpr_avl { gpr_avl_node *root; } gpr_avl; -/** create an immutable AVL tree */ +/** Create an immutable AVL tree. */ GPRAPI gpr_avl gpr_avl_create(const gpr_avl_vtable *vtable); -/** add a reference to an existing tree - returns - the tree as a convenience */ -GPRAPI gpr_avl gpr_avl_ref(gpr_avl avl); -/** remove a reference to a tree - destroying it if there - are no references left */ -GPRAPI void gpr_avl_unref(gpr_avl avl); -/** return a new tree with (key, value) added to avl. +/** Add a reference to an existing tree - returns + the tree as a convenience. The optional user_data will be passed to vtable + functions. */ +GPRAPI gpr_avl gpr_avl_ref(gpr_avl avl, void *user_data); +/** Remove a reference to a tree - destroying it if there + are no references left. The optional user_data will be passed to vtable + functions. */ +GPRAPI void gpr_avl_unref(gpr_avl avl, void *user_data); +/** Return a new tree with (key, value) added to avl. implicitly unrefs avl to allow easy chaining. if key exists in avl, the new tree's key entry updated - (i.e. a duplicate is not created) */ -GPRAPI gpr_avl gpr_avl_add(gpr_avl avl, void *key, void *value); -/** return a new tree with key deleted - implicitly unrefs avl to allow easy chaining. */ -GPRAPI gpr_avl gpr_avl_remove(gpr_avl avl, void *key); -/** lookup key, and return the associated value. - does not mutate avl. - returns NULL if key is not found. */ -GPRAPI void *gpr_avl_get(gpr_avl avl, void *key); + (i.e. a duplicate is not created). The optional user_data will be passed to + vtable functions. */ +GPRAPI gpr_avl gpr_avl_add(gpr_avl avl, void *key, void *value, + void *user_data); +/** Return a new tree with key deleted + implicitly unrefs avl to allow easy chaining. The optional user_data will be + passed to vtable functions. */ +GPRAPI gpr_avl gpr_avl_remove(gpr_avl avl, void *key, void *user_data); +/** Lookup key, and return the associated value. + Does not mutate avl. + Returns NULL if key is not found. The optional user_data will be passed to + vtable functions.*/ +GPRAPI void *gpr_avl_get(gpr_avl avl, void *key, void *user_data); /** Return 1 if avl contains key, 0 otherwise; if it has the key, sets *value to - its value*/ -GPRAPI int gpr_avl_maybe_get(gpr_avl avl, void *key, void **value); + its value. THe optional user_data will be passed to vtable functions. */ +GPRAPI int gpr_avl_maybe_get(gpr_avl avl, void *key, void **value, + void *user_data); /** Return 1 if avl is empty, 0 otherwise */ GPRAPI int gpr_avl_is_empty(gpr_avl avl); diff --git a/include/grpc/support/sync_custom.h b/include/grpc/support/sync_custom.h new file mode 100644 index 0000000000..b575f5e002 --- /dev/null +++ b/include/grpc/support/sync_custom.h @@ -0,0 +1,24 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_SUPPORT_SYNC_CUSTOM_H +#define GRPC_SUPPORT_SYNC_CUSTOM_H + +#include <grpc/impl/codegen/sync_custom.h> + +#endif /* GRPC_SUPPORT_SYNC_CUSTOM_H */ |