aboutsummaryrefslogtreecommitdiffhomepage
path: root/include/grpc++/impl/codegen/async_stream.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/grpc++/impl/codegen/async_stream.h')
-rw-r--r--include/grpc++/impl/codegen/async_stream.h57
1 files changed, 45 insertions, 12 deletions
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h
index a607a47106..e96d224ddb 100644
--- a/include/grpc++/impl/codegen/async_stream.h
+++ b/include/grpc++/impl/codegen/async_stream.h
@@ -52,11 +52,14 @@ class ClientAsyncStreamingInterface {
/// Request notification of the reading of the initial metadata. Completion
/// will be notified by \a tag on the associated completion queue.
+ /// This call is optional, but if it is used, it cannot be used concurrently
+ /// with or after the \a Read method.
///
/// \param[in] tag Tag identifying this request.
virtual void ReadInitialMetadata(void* tag) = 0;
- /// Request notification completion.
+ /// Indicate that the stream is to be finished and request notification
+ /// Should not be used concurrently with other operations
///
/// \param[out] status To be updated with the operation status.
/// \param[in] tag Tag identifying this request.
@@ -71,6 +74,11 @@ class AsyncReaderInterface {
/// Read a message of type \a R into \a msg. Completion will be notified by \a
/// tag on the associated completion queue.
+ /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
+ /// should not be called concurrently with other streaming APIs
+ /// on the same stream. It is not meaningful to call it concurrently
+ /// with another \a Read on the same stream since reads on the same stream
+ /// are delivered in order.
///
/// \param[out] msg Where to eventually store the read message.
/// \param[in] tag The tag identifying the operation.
@@ -88,6 +96,7 @@ class AsyncWriterInterface {
/// Only one write may be outstanding at any given time. This means that
/// after calling Write, one must wait to receive \a tag from the completion
/// queue BEFORE calling Write again.
+ /// This is thread-safe with respect to \a Read
///
/// \param[in] msg The message to be written.
/// \param[in] tag The tag identifying the operation.
@@ -158,6 +167,7 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface,
public AsyncWriterInterface<W> {
public:
/// Signal the client is done with the writes.
+ /// Thread-safe with respect to \a Read
///
/// \param[in] tag The tag identifying the operation.
virtual void WritesDone(void* tag) = 0;
@@ -172,6 +182,7 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
R* response, void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
finish_ops_.RecvMessage(response);
+ finish_ops_.AllowNoMessage();
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_,
@@ -228,6 +239,7 @@ class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface,
public AsyncReaderInterface<R> {
public:
/// Signal the client is done with the writes.
+ /// Thread-safe with respect to \a Read
///
/// \param[in] tag The tag identifying the operation.
virtual void WritesDone(void* tag) = 0;
@@ -298,8 +310,16 @@ class ClientAsyncReaderWriter GRPC_FINAL
};
template <class W, class R>
-class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
- public AsyncReaderInterface<R> {
+class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface,
+ public AsyncReaderInterface<R> {
+ public:
+ virtual void Finish(const W& msg, const Status& status, void* tag) = 0;
+
+ virtual void FinishWithError(const Status& status, void* tag) = 0;
+};
+
+template <class W, class R>
+class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface<W, R> {
public:
explicit ServerAsyncReader(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
@@ -320,7 +340,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
call_.PerformOps(&read_ops_);
}
- void Finish(const W& msg, const Status& status, void* tag) {
+ void Finish(const W& msg, const Status& status, void* tag) GRPC_OVERRIDE {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
@@ -337,7 +357,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
call_.PerformOps(&finish_ops_);
}
- void FinishWithError(const Status& status, void* tag) {
+ void FinishWithError(const Status& status, void* tag) GRPC_OVERRIDE {
GPR_CODEGEN_ASSERT(!status.ok());
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
@@ -362,8 +382,14 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
};
template <class W>
-class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
- public AsyncWriterInterface<W> {
+class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface,
+ public AsyncWriterInterface<W> {
+ public:
+ virtual void Finish(const Status& status, void* tag) = 0;
+};
+
+template <class W>
+class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface<W> {
public:
explicit ServerAsyncWriter(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
@@ -390,7 +416,7 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
call_.PerformOps(&write_ops_);
}
- void Finish(const Status& status, void* tag) {
+ void Finish(const Status& status, void* tag) GRPC_OVERRIDE {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
@@ -413,9 +439,16 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
/// Server-side interface for asynchronous bi-directional streaming.
template <class W, class R>
-class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
- public AsyncWriterInterface<W>,
- public AsyncReaderInterface<R> {
+class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface,
+ public AsyncWriterInterface<W>,
+ public AsyncReaderInterface<R> {
+ public:
+ virtual void Finish(const Status& status, void* tag) = 0;
+};
+
+template <class W, class R>
+class ServerAsyncReaderWriter GRPC_FINAL
+ : public ServerAsyncReaderWriterInterface<W, R> {
public:
explicit ServerAsyncReaderWriter(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
@@ -448,7 +481,7 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
call_.PerformOps(&write_ops_);
}
- void Finish(const Status& status, void* tag) {
+ void Finish(const Status& status, void* tag) GRPC_OVERRIDE {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,