diff options
Diffstat (limited to 'include/grpc++/impl/codegen/async_stream.h')
-rw-r--r-- | include/grpc++/impl/codegen/async_stream.h | 57 |
1 files changed, 45 insertions, 12 deletions
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index a607a47106..e96d224ddb 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -52,11 +52,14 @@ class ClientAsyncStreamingInterface { /// Request notification of the reading of the initial metadata. Completion /// will be notified by \a tag on the associated completion queue. + /// This call is optional, but if it is used, it cannot be used concurrently + /// with or after the \a Read method. /// /// \param[in] tag Tag identifying this request. virtual void ReadInitialMetadata(void* tag) = 0; - /// Request notification completion. + /// Indicate that the stream is to be finished and request notification + /// Should not be used concurrently with other operations /// /// \param[out] status To be updated with the operation status. /// \param[in] tag Tag identifying this request. @@ -71,6 +74,11 @@ class AsyncReaderInterface { /// Read a message of type \a R into \a msg. Completion will be notified by \a /// tag on the associated completion queue. + /// This is thread-safe with respect to \a Write or \a WritesDone methods. It + /// should not be called concurrently with other streaming APIs + /// on the same stream. It is not meaningful to call it concurrently + /// with another \a Read on the same stream since reads on the same stream + /// are delivered in order. /// /// \param[out] msg Where to eventually store the read message. /// \param[in] tag The tag identifying the operation. @@ -88,6 +96,7 @@ class AsyncWriterInterface { /// Only one write may be outstanding at any given time. This means that /// after calling Write, one must wait to receive \a tag from the completion /// queue BEFORE calling Write again. + /// This is thread-safe with respect to \a Read /// /// \param[in] msg The message to be written. /// \param[in] tag The tag identifying the operation. @@ -158,6 +167,7 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, public AsyncWriterInterface<W> { public: /// Signal the client is done with the writes. + /// Thread-safe with respect to \a Read /// /// \param[in] tag The tag identifying the operation. virtual void WritesDone(void* tag) = 0; @@ -172,6 +182,7 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> { R* response, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { finish_ops_.RecvMessage(response); + finish_ops_.AllowNoMessage(); init_ops_.set_output_tag(tag); init_ops_.SendInitialMetadata(context->send_initial_metadata_, @@ -228,6 +239,7 @@ class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface, public AsyncReaderInterface<R> { public: /// Signal the client is done with the writes. + /// Thread-safe with respect to \a Read /// /// \param[in] tag The tag identifying the operation. virtual void WritesDone(void* tag) = 0; @@ -298,8 +310,16 @@ class ClientAsyncReaderWriter GRPC_FINAL }; template <class W, class R> -class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, - public AsyncReaderInterface<R> { +class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface, + public AsyncReaderInterface<R> { + public: + virtual void Finish(const W& msg, const Status& status, void* tag) = 0; + + virtual void FinishWithError(const Status& status, void* tag) = 0; +}; + +template <class W, class R> +class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface<W, R> { public: explicit ServerAsyncReader(ServerContext* ctx) : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} @@ -320,7 +340,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, call_.PerformOps(&read_ops_); } - void Finish(const W& msg, const Status& status, void* tag) { + void Finish(const W& msg, const Status& status, void* tag) GRPC_OVERRIDE { finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, @@ -337,7 +357,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, call_.PerformOps(&finish_ops_); } - void FinishWithError(const Status& status, void* tag) { + void FinishWithError(const Status& status, void* tag) GRPC_OVERRIDE { GPR_CODEGEN_ASSERT(!status.ok()); finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { @@ -362,8 +382,14 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, }; template <class W> -class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, - public AsyncWriterInterface<W> { +class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface, + public AsyncWriterInterface<W> { + public: + virtual void Finish(const Status& status, void* tag) = 0; +}; + +template <class W> +class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface<W> { public: explicit ServerAsyncWriter(ServerContext* ctx) : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} @@ -390,7 +416,7 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, call_.PerformOps(&write_ops_); } - void Finish(const Status& status, void* tag) { + void Finish(const Status& status, void* tag) GRPC_OVERRIDE { finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, @@ -413,9 +439,16 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, /// Server-side interface for asynchronous bi-directional streaming. template <class W, class R> -class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, - public AsyncWriterInterface<W>, - public AsyncReaderInterface<R> { +class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface, + public AsyncWriterInterface<W>, + public AsyncReaderInterface<R> { + public: + virtual void Finish(const Status& status, void* tag) = 0; +}; + +template <class W, class R> +class ServerAsyncReaderWriter GRPC_FINAL + : public ServerAsyncReaderWriterInterface<W, R> { public: explicit ServerAsyncReaderWriter(ServerContext* ctx) : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} @@ -448,7 +481,7 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, call_.PerformOps(&write_ops_); } - void Finish(const Status& status, void* tag) { + void Finish(const Status& status, void* tag) GRPC_OVERRIDE { finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, |