34 #ifndef GRPCXX_SUPPORT_ASYNC_STREAM_H
35 #define GRPCXX_SUPPORT_ASYNC_STREAM_H
37 #include <grpc/support/log.h>
77 virtual void Read(R* msg,
void* tag) = 0;
90 virtual void Write(
const W& msg,
void* tag) = 0;
104 const W& request,
void* tag)
105 : context_(context), call_(channel->CreateCall(method, context, cq)) {
106 init_ops_.set_output_tag(tag);
107 init_ops_.SendInitialMetadata(context->send_initial_metadata_);
109 GPR_ASSERT(init_ops_.SendMessage(request).ok());
110 init_ops_.ClientSendClose();
115 GPR_ASSERT(!context_->initial_metadata_received_);
117 meta_ops_.set_output_tag(tag);
118 meta_ops_.RecvInitialMetadata(context_);
123 read_ops_.set_output_tag(tag);
124 if (!context_->initial_metadata_received_) {
125 read_ops_.RecvInitialMetadata(context_);
127 read_ops_.RecvMessage(msg);
132 finish_ops_.set_output_tag(tag);
133 if (!context_->initial_metadata_received_) {
134 finish_ops_.RecvInitialMetadata(context_);
136 finish_ops_.ClientRecvStatus(context_, status);
167 R* response,
void* tag)
168 : context_(context), call_(channel->CreateCall(method, context, cq)) {
169 finish_ops_.RecvMessage(response);
171 init_ops_.set_output_tag(tag);
172 init_ops_.SendInitialMetadata(context->send_initial_metadata_);
177 GPR_ASSERT(!context_->initial_metadata_received_);
179 meta_ops_.set_output_tag(tag);
180 meta_ops_.RecvInitialMetadata(context_);
185 write_ops_.set_output_tag(tag);
187 GPR_ASSERT(write_ops_.SendMessage(msg).ok());
192 writes_done_ops_.set_output_tag(tag);
193 writes_done_ops_.ClientSendClose();
198 finish_ops_.set_output_tag(tag);
199 if (!context_->initial_metadata_received_) {
200 finish_ops_.RecvInitialMetadata(context_);
202 finish_ops_.ClientRecvStatus(context_, status);
218 template <
class W,
class R>
229 template <
class W,
class R>
236 : context_(context), call_(channel->CreateCall(method, context, cq)) {
237 init_ops_.set_output_tag(tag);
238 init_ops_.SendInitialMetadata(context->send_initial_metadata_);
243 GPR_ASSERT(!context_->initial_metadata_received_);
245 meta_ops_.set_output_tag(tag);
246 meta_ops_.RecvInitialMetadata(context_);
251 read_ops_.set_output_tag(tag);
252 if (!context_->initial_metadata_received_) {
253 read_ops_.RecvInitialMetadata(context_);
255 read_ops_.RecvMessage(msg);
260 write_ops_.set_output_tag(tag);
262 GPR_ASSERT(write_ops_.SendMessage(msg).ok());
267 writes_done_ops_.set_output_tag(tag);
268 writes_done_ops_.ClientSendClose();
273 finish_ops_.set_output_tag(tag);
274 if (!context_->initial_metadata_received_) {
275 finish_ops_.RecvInitialMetadata(context_);
277 finish_ops_.ClientRecvStatus(context_, status);
292 template <
class W,
class R>
293 class ServerAsyncReader
GRPC_FINAL :
public ServerAsyncStreamingInterface,
294 public AsyncReaderInterface<R> {
297 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
300 GPR_ASSERT(!ctx_->sent_initial_metadata_);
302 meta_ops_.set_output_tag(tag);
303 meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
304 ctx_->sent_initial_metadata_ =
true;
309 read_ops_.set_output_tag(tag);
310 read_ops_.RecvMessage(msg);
315 finish_ops_.set_output_tag(tag);
316 if (!ctx_->sent_initial_metadata_) {
317 finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
318 ctx_->sent_initial_metadata_ =
true;
322 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_,
323 finish_ops_.SendMessage(msg));
325 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
331 GPR_ASSERT(!status.
ok());
332 finish_ops_.set_output_tag(tag);
333 if (!ctx_->sent_initial_metadata_) {
334 finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
335 ctx_->sent_initial_metadata_ =
true;
337 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
346 CallOpSet<CallOpSendInitialMetadata> meta_ops_;
347 CallOpSet<CallOpRecvMessage<R>> read_ops_;
348 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
349 CallOpServerSendStatus> finish_ops_;
353 class ServerAsyncWriter
GRPC_FINAL :
public ServerAsyncStreamingInterface,
354 public AsyncWriterInterface<W> {
357 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
360 GPR_ASSERT(!ctx_->sent_initial_metadata_);
362 meta_ops_.set_output_tag(tag);
363 meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
364 ctx_->sent_initial_metadata_ =
true;
369 write_ops_.set_output_tag(tag);
370 if (!ctx_->sent_initial_metadata_) {
371 write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
372 ctx_->sent_initial_metadata_ =
true;
375 GPR_ASSERT(write_ops_.SendMessage(msg).ok());
380 finish_ops_.set_output_tag(tag);
381 if (!ctx_->sent_initial_metadata_) {
382 finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
383 ctx_->sent_initial_metadata_ =
true;
385 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
394 CallOpSet<CallOpSendInitialMetadata> meta_ops_;
395 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
396 CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
400 template <
class W,
class R>
401 class ServerAsyncReaderWriter
GRPC_FINAL :
public ServerAsyncStreamingInterface,
402 public AsyncWriterInterface<W>,
403 public AsyncReaderInterface<R> {
406 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
409 GPR_ASSERT(!ctx_->sent_initial_metadata_);
411 meta_ops_.set_output_tag(tag);
412 meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
413 ctx_->sent_initial_metadata_ =
true;
418 read_ops_.set_output_tag(tag);
419 read_ops_.RecvMessage(msg);
424 write_ops_.set_output_tag(tag);
425 if (!ctx_->sent_initial_metadata_) {
426 write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
427 ctx_->sent_initial_metadata_ =
true;
430 GPR_ASSERT(write_ops_.SendMessage(msg).ok());
435 finish_ops_.set_output_tag(tag);
436 if (!ctx_->sent_initial_metadata_) {
437 finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
438 ctx_->sent_initial_metadata_ =
true;
440 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
445 friend class ::grpc::Server;
459 #endif // GRPCXX_SUPPORT_ASYNC_STREAM_H
Common interface for all client side asynchronous streaming.
Definition: async_stream.h:49
ClientAsyncReaderWriter(Channel *channel, CompletionQueue *cq, const RpcMethod &method, ClientContext *context, void *tag)
Definition: async_stream.h:233
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:184
void Read(R *msg, void *tag) GRPC_OVERRIDE
Read a message of type R into msg.
Definition: async_stream.h:417
void WritesDone(void *tag) GRPC_OVERRIDE
Signal the client is done with the writes.
Definition: async_stream.h:191
void SendInitialMetadata(void *tag) GRPC_OVERRIDE
Definition: async_stream.h:359
virtual void WritesDone(void *tag)=0
Signal the client is done with the writes.
virtual void Write(const W &msg, void *tag)=0
Request the writing of msg with identifying tag tag.
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:368
#define GRPC_FINAL
Definition: config.h:71
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:259
void Read(R *msg, void *tag) GRPC_OVERRIDE
Read a message of type R into msg.
Definition: async_stream.h:308
void SendInitialMetadata(void *tag) GRPC_OVERRIDE
Definition: async_stream.h:408
virtual void WritesDone(void *tag)=0
Signal the client is done with the writes.
Definition: client_context.h:149
void FinishWithError(const Status &status, void *tag)
Definition: async_stream.h:330
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:423
virtual ~AsyncReaderInterface()
Definition: async_stream.h:70
Definition: async_stream.h:94
void WritesDone(void *tag) GRPC_OVERRIDE
Signal the client is done with the writes.
Definition: async_stream.h:266
Client-side interface for asynchronous bi-directional streaming.
Definition: async_stream.h:219
ServerAsyncWriter(ServerContext *ctx)
Definition: async_stream.h:356
void Finish(Status *status, void *tag) GRPC_OVERRIDE
Request notification completion.
Definition: async_stream.h:272
void ReadInitialMetadata(void *tag) GRPC_OVERRIDE
Request notification of the reading of the initial metadata.
Definition: async_stream.h:176
An interface that can be fed a sequence of messages of type W.
Definition: async_stream.h:82
Primary implementaiton of CallOpSetInterface.
Definition: call.h:502
Definition: server_context.h:89
void Finish(const W &msg, const Status &status, void *tag)
Definition: async_stream.h:314
A thin wrapper around grpc_completion_queue (see / src/core/surface/completion_queue.h).
Definition: completion_queue.h:81
virtual ~ClientAsyncStreamingInterface()
Definition: async_stream.h:51
void Finish(Status *status, void *tag) GRPC_OVERRIDE
Request notification completion.
Definition: async_stream.h:197
void Finish(const Status &status, void *tag)
Definition: async_stream.h:379
virtual void ReadInitialMetadata(void *tag)=0
Request notification of the reading of the initial metadata.
Definition: rpc_method.h:43
ClientAsyncReader(Channel *channel, CompletionQueue *cq, const RpcMethod &method, ClientContext *context, const W &request, void *tag)
Create a stream and write the first request out.
Definition: async_stream.h:102
void Finish(Status *status, void *tag) GRPC_OVERRIDE
Request notification completion.
Definition: async_stream.h:131
void PerformOps(CallOpSetInterface *ops)
Definition: call.cc:85
bool ok() const
Is the status OK?
Definition: status.h:67
Did it work? If it didn't, why?
Definition: status.h:45
virtual void Finish(Status *status, void *tag)=0
Request notification completion.
void SendInitialMetadata(void *tag) GRPC_OVERRIDE
Definition: async_stream.h:299
void Read(R *msg, void *tag) GRPC_OVERRIDE
Read a message of type R into msg.
Definition: async_stream.h:122
void Read(R *msg, void *tag) GRPC_OVERRIDE
Read a message of type R into msg.
Definition: async_stream.h:250
ServerAsyncReaderWriter(ServerContext *ctx)
Definition: async_stream.h:405
virtual ~AsyncWriterInterface()
Definition: async_stream.h:84
void Finish(const Status &status, void *tag)
Definition: async_stream.h:434
ClientAsyncWriter(Channel *channel, CompletionQueue *cq, const RpcMethod &method, ClientContext *context, R *response, void *tag)
Definition: async_stream.h:165
#define GRPC_OVERRIDE
Definition: config.h:77
void ReadInitialMetadata(void *tag) GRPC_OVERRIDE
Request notification of the reading of the initial metadata.
Definition: async_stream.h:242
virtual void Read(R *msg, void *tag)=0
Read a message of type R into msg.
ServerAsyncReader(ServerContext *ctx)
Definition: async_stream.h:296
An interface that yields a sequence of messages of type R.
Definition: async_stream.h:68
Channels represent a connection to an endpoint. Created by CreateChannel.
Definition: channel.h:69
void ReadInitialMetadata(void *tag) GRPC_OVERRIDE
Request notification of the reading of the initial metadata.
Definition: async_stream.h:114
Common interface for client side asynchronous writing.
Definition: async_stream.h:152