GRPC C++  0.11.0.0
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
async_stream.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015, Google Inc.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  * * Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  * * Redistributions in binary form must reproduce the above
13  * copyright notice, this list of conditions and the following disclaimer
14  * in the documentation and/or other materials provided with the
15  * distribution.
16  * * Neither the name of Google Inc. nor the names of its
17  * contributors may be used to endorse or promote products derived from
18  * this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  */
33 
34 #ifndef GRPCXX_SUPPORT_ASYNC_STREAM_H
35 #define GRPCXX_SUPPORT_ASYNC_STREAM_H
36 
37 #include <grpc/support/log.h>
38 #include <grpc++/channel.h>
39 #include <grpc++/client_context.h>
41 #include <grpc++/impl/call.h>
43 #include <grpc++/server_context.h>
44 #include <grpc++/support/status.h>
45 
46 namespace grpc {
47 
50  public:
52 
57  virtual void ReadInitialMetadata(void* tag) = 0;
58 
63  virtual void Finish(Status* status, void* tag) = 0;
64 };
65 
67 template <class R>
69  public:
70  virtual ~AsyncReaderInterface() {}
71 
77  virtual void Read(R* msg, void* tag) = 0;
78 };
79 
81 template <class W>
83  public:
84  virtual ~AsyncWriterInterface() {}
85 
90  virtual void Write(const W& msg, void* tag) = 0;
91 };
92 
93 template <class R>
95  public AsyncReaderInterface<R> {};
96 
97 template <class R>
99  public:
101  template <class W>
103  const RpcMethod& method, ClientContext* context,
104  const W& request, void* tag)
105  : context_(context), call_(channel->CreateCall(method, context, cq)) {
106  init_ops_.set_output_tag(tag);
107  init_ops_.SendInitialMetadata(context->send_initial_metadata_);
108  // TODO(ctiller): don't assert
109  GPR_ASSERT(init_ops_.SendMessage(request).ok());
110  init_ops_.ClientSendClose();
111  call_.PerformOps(&init_ops_);
112  }
113 
115  GPR_ASSERT(!context_->initial_metadata_received_);
116 
117  meta_ops_.set_output_tag(tag);
118  meta_ops_.RecvInitialMetadata(context_);
119  call_.PerformOps(&meta_ops_);
120  }
121 
122  void Read(R* msg, void* tag) GRPC_OVERRIDE {
123  read_ops_.set_output_tag(tag);
124  if (!context_->initial_metadata_received_) {
125  read_ops_.RecvInitialMetadata(context_);
126  }
127  read_ops_.RecvMessage(msg);
128  call_.PerformOps(&read_ops_);
129  }
130 
131  void Finish(Status* status, void* tag) GRPC_OVERRIDE {
132  finish_ops_.set_output_tag(tag);
133  if (!context_->initial_metadata_received_) {
134  finish_ops_.RecvInitialMetadata(context_);
135  }
136  finish_ops_.ClientRecvStatus(context_, status);
137  call_.PerformOps(&finish_ops_);
138  }
139 
140  private:
141  ClientContext* context_;
142  Call call_;
144  init_ops_;
148 };
149 
151 template <class W>
153  public AsyncWriterInterface<W> {
154  public:
158  virtual void WritesDone(void* tag) = 0;
159 };
160 
161 template <class W>
163  public:
164  template <class R>
166  const RpcMethod& method, ClientContext* context,
167  R* response, void* tag)
168  : context_(context), call_(channel->CreateCall(method, context, cq)) {
169  finish_ops_.RecvMessage(response);
170 
171  init_ops_.set_output_tag(tag);
172  init_ops_.SendInitialMetadata(context->send_initial_metadata_);
173  call_.PerformOps(&init_ops_);
174  }
175 
177  GPR_ASSERT(!context_->initial_metadata_received_);
178 
179  meta_ops_.set_output_tag(tag);
180  meta_ops_.RecvInitialMetadata(context_);
181  call_.PerformOps(&meta_ops_);
182  }
183 
184  void Write(const W& msg, void* tag) GRPC_OVERRIDE {
185  write_ops_.set_output_tag(tag);
186  // TODO(ctiller): don't assert
187  GPR_ASSERT(write_ops_.SendMessage(msg).ok());
188  call_.PerformOps(&write_ops_);
189  }
190 
191  void WritesDone(void* tag) GRPC_OVERRIDE {
192  writes_done_ops_.set_output_tag(tag);
193  writes_done_ops_.ClientSendClose();
194  call_.PerformOps(&writes_done_ops_);
195  }
196 
197  void Finish(Status* status, void* tag) GRPC_OVERRIDE {
198  finish_ops_.set_output_tag(tag);
199  if (!context_->initial_metadata_received_) {
200  finish_ops_.RecvInitialMetadata(context_);
201  }
202  finish_ops_.ClientRecvStatus(context_, status);
203  call_.PerformOps(&finish_ops_);
204  }
205 
206  private:
207  ClientContext* context_;
208  Call call_;
211  CallOpSet<CallOpSendMessage> write_ops_;
212  CallOpSet<CallOpClientSendClose> writes_done_ops_;
214  CallOpClientRecvStatus> finish_ops_;
215 };
216 
218 template <class W, class R>
220  public AsyncWriterInterface<W>,
221  public AsyncReaderInterface<R> {
222  public:
226  virtual void WritesDone(void* tag) = 0;
227 };
228 
229 template <class W, class R>
231  : public ClientAsyncReaderWriterInterface<W, R> {
232  public:
234  const RpcMethod& method, ClientContext* context,
235  void* tag)
236  : context_(context), call_(channel->CreateCall(method, context, cq)) {
237  init_ops_.set_output_tag(tag);
238  init_ops_.SendInitialMetadata(context->send_initial_metadata_);
239  call_.PerformOps(&init_ops_);
240  }
241 
243  GPR_ASSERT(!context_->initial_metadata_received_);
244 
245  meta_ops_.set_output_tag(tag);
246  meta_ops_.RecvInitialMetadata(context_);
247  call_.PerformOps(&meta_ops_);
248  }
249 
250  void Read(R* msg, void* tag) GRPC_OVERRIDE {
251  read_ops_.set_output_tag(tag);
252  if (!context_->initial_metadata_received_) {
253  read_ops_.RecvInitialMetadata(context_);
254  }
255  read_ops_.RecvMessage(msg);
256  call_.PerformOps(&read_ops_);
257  }
258 
259  void Write(const W& msg, void* tag) GRPC_OVERRIDE {
260  write_ops_.set_output_tag(tag);
261  // TODO(ctiller): don't assert
262  GPR_ASSERT(write_ops_.SendMessage(msg).ok());
263  call_.PerformOps(&write_ops_);
264  }
265 
266  void WritesDone(void* tag) GRPC_OVERRIDE {
267  writes_done_ops_.set_output_tag(tag);
268  writes_done_ops_.ClientSendClose();
269  call_.PerformOps(&writes_done_ops_);
270  }
271 
272  void Finish(Status* status, void* tag) GRPC_OVERRIDE {
273  finish_ops_.set_output_tag(tag);
274  if (!context_->initial_metadata_received_) {
275  finish_ops_.RecvInitialMetadata(context_);
276  }
277  finish_ops_.ClientRecvStatus(context_, status);
278  call_.PerformOps(&finish_ops_);
279  }
280 
281  private:
282  ClientContext* context_;
283  Call call_;
287  CallOpSet<CallOpSendMessage> write_ops_;
288  CallOpSet<CallOpClientSendClose> writes_done_ops_;
290 };
291 
292 template <class W, class R>
293 class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
294  public AsyncReaderInterface<R> {
295  public:
297  : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
298 
300  GPR_ASSERT(!ctx_->sent_initial_metadata_);
301 
302  meta_ops_.set_output_tag(tag);
303  meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
304  ctx_->sent_initial_metadata_ = true;
305  call_.PerformOps(&meta_ops_);
306  }
307 
308  void Read(R* msg, void* tag) GRPC_OVERRIDE {
309  read_ops_.set_output_tag(tag);
310  read_ops_.RecvMessage(msg);
311  call_.PerformOps(&read_ops_);
312  }
313 
314  void Finish(const W& msg, const Status& status, void* tag) {
315  finish_ops_.set_output_tag(tag);
316  if (!ctx_->sent_initial_metadata_) {
317  finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
318  ctx_->sent_initial_metadata_ = true;
319  }
320  // The response is dropped if the status is not OK.
321  if (status.ok()) {
322  finish_ops_.ServerSendStatus(ctx_->trailing_metadata_,
323  finish_ops_.SendMessage(msg));
324  } else {
325  finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
326  }
327  call_.PerformOps(&finish_ops_);
328  }
329 
330  void FinishWithError(const Status& status, void* tag) {
331  GPR_ASSERT(!status.ok());
332  finish_ops_.set_output_tag(tag);
333  if (!ctx_->sent_initial_metadata_) {
334  finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
335  ctx_->sent_initial_metadata_ = true;
336  }
337  finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
338  call_.PerformOps(&finish_ops_);
339  }
340 
341  private:
342  void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
343 
344  Call call_;
345  ServerContext* ctx_;
346  CallOpSet<CallOpSendInitialMetadata> meta_ops_;
347  CallOpSet<CallOpRecvMessage<R>> read_ops_;
348  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
349  CallOpServerSendStatus> finish_ops_;
350 };
351 
352 template <class W>
353 class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
354  public AsyncWriterInterface<W> {
355  public:
357  : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
358 
360  GPR_ASSERT(!ctx_->sent_initial_metadata_);
361 
362  meta_ops_.set_output_tag(tag);
363  meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
364  ctx_->sent_initial_metadata_ = true;
365  call_.PerformOps(&meta_ops_);
366  }
367 
368  void Write(const W& msg, void* tag) GRPC_OVERRIDE {
369  write_ops_.set_output_tag(tag);
370  if (!ctx_->sent_initial_metadata_) {
371  write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
372  ctx_->sent_initial_metadata_ = true;
373  }
374  // TODO(ctiller): don't assert
375  GPR_ASSERT(write_ops_.SendMessage(msg).ok());
376  call_.PerformOps(&write_ops_);
377  }
378 
379  void Finish(const Status& status, void* tag) {
380  finish_ops_.set_output_tag(tag);
381  if (!ctx_->sent_initial_metadata_) {
382  finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
383  ctx_->sent_initial_metadata_ = true;
384  }
385  finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
386  call_.PerformOps(&finish_ops_);
387  }
388 
389  private:
390  void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
391 
392  Call call_;
393  ServerContext* ctx_;
394  CallOpSet<CallOpSendInitialMetadata> meta_ops_;
395  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
396  CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
397 };
398 
400 template <class W, class R>
401 class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
402  public AsyncWriterInterface<W>,
403  public AsyncReaderInterface<R> {
404  public:
406  : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
407 
409  GPR_ASSERT(!ctx_->sent_initial_metadata_);
410 
411  meta_ops_.set_output_tag(tag);
412  meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
413  ctx_->sent_initial_metadata_ = true;
414  call_.PerformOps(&meta_ops_);
415  }
416 
417  void Read(R* msg, void* tag) GRPC_OVERRIDE {
418  read_ops_.set_output_tag(tag);
419  read_ops_.RecvMessage(msg);
420  call_.PerformOps(&read_ops_);
421  }
422 
423  void Write(const W& msg, void* tag) GRPC_OVERRIDE {
424  write_ops_.set_output_tag(tag);
425  if (!ctx_->sent_initial_metadata_) {
426  write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
427  ctx_->sent_initial_metadata_ = true;
428  }
429  // TODO(ctiller): don't assert
430  GPR_ASSERT(write_ops_.SendMessage(msg).ok());
431  call_.PerformOps(&write_ops_);
432  }
433 
434  void Finish(const Status& status, void* tag) {
435  finish_ops_.set_output_tag(tag);
436  if (!ctx_->sent_initial_metadata_) {
437  finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
438  ctx_->sent_initial_metadata_ = true;
439  }
440  finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
441  call_.PerformOps(&finish_ops_);
442  }
443 
444  private:
445  friend class ::grpc::Server;
446 
447  void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
448 
449  Call call_;
450  ServerContext* ctx_;
455 };
456 
457 } // namespace grpc
458 
459 #endif // GRPCXX_SUPPORT_ASYNC_STREAM_H
Common interface for all client side asynchronous streaming.
Definition: async_stream.h:49
Definition: channel.h:64
ClientAsyncReaderWriter(Channel *channel, CompletionQueue *cq, const RpcMethod &method, ClientContext *context, void *tag)
Definition: async_stream.h:233
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:184
void Read(R *msg, void *tag) GRPC_OVERRIDE
Read a message of type R into msg.
Definition: async_stream.h:417
void WritesDone(void *tag) GRPC_OVERRIDE
Signal the client is done with the writes.
Definition: async_stream.h:191
void SendInitialMetadata(void *tag) GRPC_OVERRIDE
Definition: async_stream.h:359
virtual void WritesDone(void *tag)=0
Signal the client is done with the writes.
Definition: call.h:426
virtual void Write(const W &msg, void *tag)=0
Request the writing of msg with identifying tag tag.
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:368
#define GRPC_FINAL
Definition: config.h:71
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:259
void Read(R *msg, void *tag) GRPC_OVERRIDE
Read a message of type R into msg.
Definition: async_stream.h:308
void SendInitialMetadata(void *tag) GRPC_OVERRIDE
Definition: async_stream.h:408
virtual void WritesDone(void *tag)=0
Signal the client is done with the writes.
Definition: client_context.h:149
void FinishWithError(const Status &status, void *tag)
Definition: async_stream.h:330
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:423
virtual ~AsyncReaderInterface()
Definition: async_stream.h:70
Definition: async_stream.h:94
void WritesDone(void *tag) GRPC_OVERRIDE
Signal the client is done with the writes.
Definition: async_stream.h:266
Definition: call.h:560
Client-side interface for asynchronous bi-directional streaming.
Definition: async_stream.h:219
ServerAsyncWriter(ServerContext *ctx)
Definition: async_stream.h:356
void Finish(Status *status, void *tag) GRPC_OVERRIDE
Request notification completion.
Definition: async_stream.h:272
void ReadInitialMetadata(void *tag) GRPC_OVERRIDE
Request notification of the reading of the initial metadata.
Definition: async_stream.h:176
An interface that can be fed a sequence of messages of type W.
Definition: async_stream.h:82
Definition: channel.h:62
Primary implementaiton of CallOpSetInterface.
Definition: call.h:502
Definition: server_context.h:89
void Finish(const W &msg, const Status &status, void *tag)
Definition: async_stream.h:314
A thin wrapper around grpc_completion_queue (see / src/core/surface/completion_queue.h).
Definition: completion_queue.h:81
virtual ~ClientAsyncStreamingInterface()
Definition: async_stream.h:51
void Finish(Status *status, void *tag) GRPC_OVERRIDE
Request notification completion.
Definition: async_stream.h:197
void Finish(const Status &status, void *tag)
Definition: async_stream.h:379
virtual void ReadInitialMetadata(void *tag)=0
Request notification of the reading of the initial metadata.
Definition: rpc_method.h:43
ClientAsyncReader(Channel *channel, CompletionQueue *cq, const RpcMethod &method, ClientContext *context, const W &request, void *tag)
Create a stream and write the first request out.
Definition: async_stream.h:102
void Finish(Status *status, void *tag) GRPC_OVERRIDE
Request notification completion.
Definition: async_stream.h:131
void PerformOps(CallOpSetInterface *ops)
bool ok() const
Is the status OK?
Definition: status.h:67
Did it work? If it didn't, why?
Definition: status.h:45
virtual void Finish(Status *status, void *tag)=0
Request notification completion.
void SendInitialMetadata(void *tag) GRPC_OVERRIDE
Definition: async_stream.h:299
void Read(R *msg, void *tag) GRPC_OVERRIDE
Read a message of type R into msg.
Definition: async_stream.h:122
void Read(R *msg, void *tag) GRPC_OVERRIDE
Read a message of type R into msg.
Definition: async_stream.h:250
ServerAsyncReaderWriter(ServerContext *ctx)
Definition: async_stream.h:405
Definition: channel.h:60
virtual ~AsyncWriterInterface()
Definition: async_stream.h:84
void Finish(const Status &status, void *tag)
Definition: async_stream.h:434
ClientAsyncWriter(Channel *channel, CompletionQueue *cq, const RpcMethod &method, ClientContext *context, R *response, void *tag)
Definition: async_stream.h:165
#define GRPC_OVERRIDE
Definition: config.h:77
void ReadInitialMetadata(void *tag) GRPC_OVERRIDE
Request notification of the reading of the initial metadata.
Definition: async_stream.h:242
Definition: call.h:396
virtual void Read(R *msg, void *tag)=0
Read a message of type R into msg.
Definition: call.h:289
ServerAsyncReader(ServerContext *ctx)
Definition: async_stream.h:296
An interface that yields a sequence of messages of type R.
Definition: async_stream.h:68
Channels represent a connection to an endpoint. Created by CreateChannel.
Definition: channel.h:69
void ReadInitialMetadata(void *tag) GRPC_OVERRIDE
Request notification of the reading of the initial metadata.
Definition: async_stream.h:114
Common interface for client side asynchronous writing.
Definition: async_stream.h:152