GRPC C++  0.10.0.0
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
call.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015, Google Inc.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  * * Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  * * Redistributions in binary form must reproduce the above
13  * copyright notice, this list of conditions and the following disclaimer
14  * in the documentation and/or other materials provided with the
15  * distribution.
16  * * Neither the name of Google Inc. nor the names of its
17  * contributors may be used to endorse or promote products derived from
18  * this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  */
33 
34 #ifndef GRPCXX_IMPL_CALL_H
35 #define GRPCXX_IMPL_CALL_H
36 
37 #include <grpc/support/alloc.h>
38 #include <grpc++/client_context.h>
40 #include <grpc++/config.h>
41 #include <grpc++/status.h>
43 
44 #include <functional>
45 #include <memory>
46 #include <map>
47 
48 #include <string.h>
49 
50 struct grpc_call;
51 struct grpc_op;
52 
53 namespace grpc {
54 
55 class ByteBuffer;
56 class Call;
57 
58 void FillMetadataMap(grpc_metadata_array* arr,
59  std::multimap<grpc::string, grpc::string>* metadata);
60 grpc_metadata* FillMetadataArray(
61  const std::multimap<grpc::string, grpc::string>& metadata);
62 
64 class WriteOptions {
65  public:
66  WriteOptions() : flags_(0) {}
67  WriteOptions(const WriteOptions& other) : flags_(other.flags_) {}
68 
70  inline void Clear() {
71  flags_ = 0;
72  }
73 
75  inline gpr_uint32 flags() const {
76  return flags_;
77  }
78 
83  SetBit(GRPC_WRITE_NO_COMPRESS);
84  return *this;
85  }
86 
91  ClearBit(GRPC_WRITE_NO_COMPRESS);
92  return *this;
93  }
94 
99  inline bool get_no_compression() const {
100  return GetBit(GRPC_WRITE_NO_COMPRESS);
101  }
102 
108  SetBit(GRPC_WRITE_BUFFER_HINT);
109  return *this;
110  }
111 
117  ClearBit(GRPC_WRITE_BUFFER_HINT);
118  return *this;
119  }
120 
125  inline bool get_buffer_hint() const {
126  return GetBit(GRPC_WRITE_BUFFER_HINT);
127  }
128 
130  flags_ = rhs.flags_;
131  return *this;
132  }
133 
134  private:
135  void SetBit(const gpr_int32 mask) {
136  flags_ |= mask;
137  }
138 
139  void ClearBit(const gpr_int32 mask) {
140  flags_ &= ~mask;
141  }
142 
143  bool GetBit(const gpr_int32 mask) const {
144  return flags_ & mask;
145  }
146 
147  gpr_uint32 flags_;
148 };
149 
152 template <int I>
153 class CallNoOp {
154  protected:
155  void AddOp(grpc_op* ops, size_t* nops) {}
156  void FinishOp(bool* status, int max_message_size) {}
157 };
158 
160  public:
162 
164  const std::multimap<grpc::string, grpc::string>& metadata) {
165  send_ = true;
166  initial_metadata_count_ = metadata.size();
168  }
169 
170  protected:
171  void AddOp(grpc_op* ops, size_t* nops) {
172  if (!send_) return;
173  grpc_op* op = &ops[(*nops)++];
174  op->op = GRPC_OP_SEND_INITIAL_METADATA;
175  op->flags = 0;
176  op->data.send_initial_metadata.count = initial_metadata_count_;
177  op->data.send_initial_metadata.metadata = initial_metadata_;
178  }
179  void FinishOp(bool* status, int max_message_size) {
180  if (!send_) return;
181  gpr_free(initial_metadata_);
182  send_ = false;
183  }
184 
185  bool send_;
187  grpc_metadata* initial_metadata_;
188 };
189 
191  public:
192  CallOpSendMessage() : send_buf_(nullptr), own_buf_(false) {}
193 
196  template <class M>
197  Status SendMessage(const M& message,
198  const WriteOptions& options) GRPC_MUST_USE_RESULT;
199 
200  template <class M>
201  Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
202 
203  protected:
204  void AddOp(grpc_op* ops, size_t* nops) {
205  if (send_buf_ == nullptr) return;
206  grpc_op* op = &ops[(*nops)++];
207  op->op = GRPC_OP_SEND_MESSAGE;
208  op->flags = write_options_.flags();
209  op->data.send_message = send_buf_;
210  // Flags are per-message: clear them after use.
211  write_options_.Clear();
212  }
213  void FinishOp(bool* status, int max_message_size) {
214  if (own_buf_) grpc_byte_buffer_destroy(send_buf_);
215  send_buf_ = nullptr;
216  }
217 
218  private:
219  grpc_byte_buffer* send_buf_;
220  WriteOptions write_options_;
221  bool own_buf_;
222 };
223 
224 template <class M>
226  const WriteOptions& options) {
227  write_options_ = options;
228  return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_);
229 }
230 
231 template <class M>
233  return SendMessage(message, WriteOptions());
234 }
235 
236 template <class R>
238  public:
239  CallOpRecvMessage() : got_message(false), message_(nullptr) {}
240 
241  void RecvMessage(R* message) { message_ = message; }
242 
244 
245  protected:
246  void AddOp(grpc_op* ops, size_t* nops) {
247  if (message_ == nullptr) return;
248  grpc_op* op = &ops[(*nops)++];
249  op->op = GRPC_OP_RECV_MESSAGE;
250  op->flags = 0;
251  op->data.recv_message = &recv_buf_;
252  }
253 
254  void FinishOp(bool* status, int max_message_size) {
255  if (message_ == nullptr) return;
256  if (recv_buf_) {
257  if (*status) {
258  got_message = true;
259  *status = SerializationTraits<R>::Deserialize(recv_buf_, message_,
260  max_message_size)
261  .ok();
262  } else {
263  got_message = false;
264  grpc_byte_buffer_destroy(recv_buf_);
265  }
266  } else {
267  got_message = false;
268  *status = false;
269  }
270  message_ = nullptr;
271  }
272 
273  private:
274  R* message_;
275  grpc_byte_buffer* recv_buf_;
276 };
277 
278 namespace CallOpGenericRecvMessageHelper {
280  public:
281  virtual Status Deserialize(grpc_byte_buffer* buf, int max_message_size) = 0;
282 };
283 
284 template <class R>
286  public:
287  DeserializeFuncType(R* message) : message_(message) {}
288  Status Deserialize(grpc_byte_buffer* buf,
289  int max_message_size) GRPC_OVERRIDE {
290  return SerializationTraits<R>::Deserialize(buf, message_, max_message_size);
291  }
292 
293  private:
294  R* message_; // Not a managed pointer because management is external to this
295 };
296 } // namespace CallOpGenericRecvMessageHelper
297 
299  public:
301 
302  template <class R>
303  void RecvMessage(R* message) {
304  deserialize_.reset(
306  }
307 
309 
310  protected:
311  void AddOp(grpc_op* ops, size_t* nops) {
312  if (!deserialize_) return;
313  grpc_op* op = &ops[(*nops)++];
314  op->op = GRPC_OP_RECV_MESSAGE;
315  op->flags = 0;
316  op->data.recv_message = &recv_buf_;
317  }
318 
319  void FinishOp(bool* status, int max_message_size) {
320  if (!deserialize_) return;
321  if (recv_buf_) {
322  if (*status) {
323  got_message = true;
324  *status = deserialize_->Deserialize(recv_buf_, max_message_size).ok();
325  } else {
326  got_message = false;
327  grpc_byte_buffer_destroy(recv_buf_);
328  }
329  } else {
330  got_message = false;
331  *status = false;
332  }
333  deserialize_.reset();
334  }
335 
336  private:
337  std::unique_ptr<CallOpGenericRecvMessageHelper::DeserializeFunc> deserialize_;
338  grpc_byte_buffer* recv_buf_;
339 };
340 
342  public:
343  CallOpClientSendClose() : send_(false) {}
344 
345  void ClientSendClose() { send_ = true; }
346 
347  protected:
348  void AddOp(grpc_op* ops, size_t* nops) {
349  if (!send_) return;
350  grpc_op* op = &ops[(*nops)++];
351  op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
352  op->flags = 0;
353  }
354  void FinishOp(bool* status, int max_message_size) { send_ = false; }
355 
356  private:
357  bool send_;
358 };
359 
361  public:
362  CallOpServerSendStatus() : send_status_available_(false) {}
363 
365  const std::multimap<grpc::string, grpc::string>& trailing_metadata,
366  const Status& status) {
367  trailing_metadata_count_ = trailing_metadata.size();
368  trailing_metadata_ = FillMetadataArray(trailing_metadata);
369  send_status_available_ = true;
370  send_status_code_ = static_cast<grpc_status_code>(status.error_code());
371  send_status_details_ = status.error_message();
372  }
373 
374  protected:
375  void AddOp(grpc_op* ops, size_t* nops) {
376  if (!send_status_available_) return;
377  grpc_op* op = &ops[(*nops)++];
378  op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
379  op->data.send_status_from_server.trailing_metadata_count =
380  trailing_metadata_count_;
381  op->data.send_status_from_server.trailing_metadata = trailing_metadata_;
382  op->data.send_status_from_server.status = send_status_code_;
383  op->data.send_status_from_server.status_details =
384  send_status_details_.empty() ? nullptr : send_status_details_.c_str();
385  op->flags = 0;
386  }
387 
388  void FinishOp(bool* status, int max_message_size) {
389  if (!send_status_available_) return;
390  gpr_free(trailing_metadata_);
391  send_status_available_ = false;
392  }
393 
394  private:
395  bool send_status_available_;
396  grpc_status_code send_status_code_;
397  grpc::string send_status_details_;
398  size_t trailing_metadata_count_;
399  grpc_metadata* trailing_metadata_;
400 };
401 
403  public:
404  CallOpRecvInitialMetadata() : recv_initial_metadata_(nullptr) {}
405 
407  context->initial_metadata_received_ = true;
408  recv_initial_metadata_ = &context->recv_initial_metadata_;
409  }
410 
411  protected:
412  void AddOp(grpc_op* ops, size_t* nops) {
413  if (!recv_initial_metadata_) return;
414  memset(&recv_initial_metadata_arr_, 0, sizeof(recv_initial_metadata_arr_));
415  grpc_op* op = &ops[(*nops)++];
416  op->op = GRPC_OP_RECV_INITIAL_METADATA;
417  op->data.recv_initial_metadata = &recv_initial_metadata_arr_;
418  op->flags = 0;
419  }
420  void FinishOp(bool* status, int max_message_size) {
421  if (recv_initial_metadata_ == nullptr) return;
422  FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_);
423  recv_initial_metadata_ = nullptr;
424  }
425 
426  private:
427  std::multimap<grpc::string, grpc::string>* recv_initial_metadata_;
428  grpc_metadata_array recv_initial_metadata_arr_;
429 };
430 
432  public:
433  CallOpClientRecvStatus() : recv_status_(nullptr) {}
434 
435  void ClientRecvStatus(ClientContext* context, Status* status) {
436  recv_trailing_metadata_ = &context->trailing_metadata_;
437  recv_status_ = status;
438  }
439 
440  protected:
441  void AddOp(grpc_op* ops, size_t* nops) {
442  if (recv_status_ == nullptr) return;
443  memset(&recv_trailing_metadata_arr_, 0,
444  sizeof(recv_trailing_metadata_arr_));
445  status_details_ = nullptr;
446  status_details_capacity_ = 0;
447  grpc_op* op = &ops[(*nops)++];
448  op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
449  op->data.recv_status_on_client.trailing_metadata =
450  &recv_trailing_metadata_arr_;
451  op->data.recv_status_on_client.status = &status_code_;
452  op->data.recv_status_on_client.status_details = &status_details_;
453  op->data.recv_status_on_client.status_details_capacity =
454  &status_details_capacity_;
455  op->flags = 0;
456  }
457 
458  void FinishOp(bool* status, int max_message_size) {
459  if (recv_status_ == nullptr) return;
460  FillMetadataMap(&recv_trailing_metadata_arr_, recv_trailing_metadata_);
461  *recv_status_ = Status(
462  static_cast<StatusCode>(status_code_),
463  status_details_ ? grpc::string(status_details_) : grpc::string());
464  gpr_free(status_details_);
465  recv_status_ = nullptr;
466  }
467 
468  private:
469  std::multimap<grpc::string, grpc::string>* recv_trailing_metadata_;
470  Status* recv_status_;
471  grpc_metadata_array recv_trailing_metadata_arr_;
472  grpc_status_code status_code_;
473  char* status_details_;
474  size_t status_details_capacity_;
475 };
476 
483  public:
487  virtual void FillOps(grpc_op* ops, size_t* nops) = 0;
488 
489  void set_max_message_size(int max_message_size) {
490  max_message_size_ = max_message_size;
491  }
492 
493  protected:
495 };
496 
503 template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
504  class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
505  class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
507  public Op1,
508  public Op2,
509  public Op3,
510  public Op4,
511  public Op5,
512  public Op6 {
513  public:
514  CallOpSet() : return_tag_(this) {}
515  void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE {
516  this->Op1::AddOp(ops, nops);
517  this->Op2::AddOp(ops, nops);
518  this->Op3::AddOp(ops, nops);
519  this->Op4::AddOp(ops, nops);
520  this->Op5::AddOp(ops, nops);
521  this->Op6::AddOp(ops, nops);
522  }
523 
524  bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
525  this->Op1::FinishOp(status, max_message_size_);
526  this->Op2::FinishOp(status, max_message_size_);
527  this->Op3::FinishOp(status, max_message_size_);
528  this->Op4::FinishOp(status, max_message_size_);
529  this->Op5::FinishOp(status, max_message_size_);
530  this->Op6::FinishOp(status, max_message_size_);
531  *tag = return_tag_;
532  return true;
533  }
534 
535  void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
536 
537  private:
538  void* return_tag_;
539 };
540 
545 template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
546  class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
547  class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
549  : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> {
550  public:
551  bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
553  return Base::FinalizeResult(tag, status) && false;
554  }
555 };
556 
557 // Channel and Server implement this to allow them to hook performing ops
558 class CallHook {
559  public:
560  virtual ~CallHook() {}
561  virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0;
562 };
563 
564 // Straightforward wrapping of the C call object
566  public:
567  /* call is owned by the caller */
568  Call(grpc_call* call, CallHook* call_hook_, CompletionQueue* cq);
569  Call(grpc_call* call, CallHook* call_hook_, CompletionQueue* cq,
570  int max_message_size);
571 
572  void PerformOps(CallOpSetInterface* ops);
573 
574  grpc_call* call() { return call_; }
575  CompletionQueue* cq() { return cq_; }
576 
577  int max_message_size() { return max_message_size_; }
578 
579  private:
580  CallHook* call_hook_;
581  CompletionQueue* cq_;
582  grpc_call* call_;
583  int max_message_size_;
584 };
585 
586 } // namespace grpc
587 
588 #endif // GRPCXX_IMPL_CALL_H
void ServerSendStatus(const std::multimap< grpc::string, grpc::string > &trailing_metadata, const Status &status)
Definition: call.h:364
Call(grpc_call *call, CallHook *call_hook_, CompletionQueue *cq)
CallOpRecvInitialMetadata()
Definition: call.h:404
void RecvMessage(R *message)
Definition: call.h:303
Definition: completion_queue.h:75
WriteOptions & clear_buffer_hint()
Clears flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call.h:116
Default argument for CallOpSet.
Definition: call.h:153
void AddOp(grpc_op *ops, size_t *nops)
Definition: call.h:348
CallOpServerSendStatus()
Definition: call.h:362
void SendInitialMetadata(const std::multimap< grpc::string, grpc::string > &metadata)
Definition: call.h:163
void FillMetadataMap(grpc_metadata_array *arr, std::multimap< grpc::string, grpc::string > *metadata)
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call.h:107
grpc::string error_message() const
Definition: status.h:53
CallOpSendMessage()
Definition: call.h:192
std::string string
Definition: config.h:112
void AddOp(grpc_op *ops, size_t *nops)
Definition: call.h:311
CompletionQueue * cq()
Definition: call.h:575
void FinishOp(bool *status, int max_message_size)
Definition: call.h:319
WriteOptions & clear_no_compression()
Clears flag for the disabling of compression for the next message write.
Definition: call.h:90
int max_message_size_
Definition: call.h:494
bool FinalizeResult(void **tag, bool *status) GRPC_OVERRIDE
Definition: call.h:551
An abstract collection of call ops, used to generate the grpc_call_op structure to pass down to the l...
Definition: call.h:482
Definition: call.h:431
void FinishOp(bool *status, int max_message_size)
Definition: call.h:354
void AddOp(grpc_op *ops, size_t *nops)
Definition: call.h:246
bool FinalizeResult(void **tag, bool *status) GRPC_OVERRIDE
Definition: call.h:524
void Clear()
Clear all flags.
Definition: call.h:70
void FinishOp(bool *status, int max_message_size)
Definition: call.h:213
void AddOp(grpc_op *ops, size_t *nops)
Definition: call.h:375
WriteOptions()
Definition: call.h:66
Definition: call.h:360
#define GRPC_FINAL
Definition: config.h:71
void FillOps(grpc_op *ops, size_t *nops) GRPC_OVERRIDE
Fills in grpc_op, starting from ops[*nops] and moving upwards.
Definition: call.h:515
grpc_metadata * FillMetadataArray(const std::multimap< grpc::string, grpc::string > &metadata)
grpc_call * call()
Definition: call.h:574
void AddOp(grpc_op *ops, size_t *nops)
Definition: call.h:171
WriteOptions & set_no_compression()
Sets flag for the disabling of compression for the next message write.
Definition: call.h:82
Definition: client_context.h:74
WriteOptions & operator=(const WriteOptions &rhs)
Definition: call.h:129
gpr_uint32 flags() const
Returns raw flags bitset.
Definition: call.h:75
void FinishOp(bool *status, int max_message_size)
Definition: call.h:458
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:64
Definition: call.h:190
bool send_
Definition: call.h:185
CallOpClientRecvStatus()
Definition: call.h:433
bool get_no_compression() const
Get value for the flag indicating whether compression for the next message write is forcefully disabl...
Definition: call.h:99
Status Deserialize(grpc_byte_buffer *buf, int max_message_size) GRPC_OVERRIDE
Definition: call.h:288
CallOpSet()
Definition: call.h:514
void FinishOp(bool *status, int max_message_size)
Definition: call.h:388
Definition: call.h:341
CallOpSendInitialMetadata()
Definition: call.h:161
void AddOp(grpc_op *ops, size_t *nops)
Definition: call.h:441
Definition: call.h:565
void AddOp(grpc_op *ops, size_t *nops)
Definition: call.h:155
void FinishOp(bool *status, int max_message_size)
Definition: call.h:420
CallOpSetInterface()
Definition: call.h:484
WriteOptions(const WriteOptions &other)
Definition: call.h:67
Primary implementaiton of CallOpSetInterface.
Definition: call.h:506
void ClientSendClose()
Definition: call.h:345
Definition: call.h:237
int max_message_size()
Definition: call.h:577
Per-message write options.
Definition: call.h:64
CallOpClientSendClose()
Definition: call.h:343
bool get_buffer_hint() const
Get value for the flag indicating that the write may be buffered and need not go out on the wire imme...
Definition: call.h:125
CallOpRecvMessage()
Definition: call.h:239
StatusCode error_code() const
Definition: status.h:52
Definition: completion_queue.h:87
Status SendMessage(const M &message, const WriteOptions &options) GRPC_MUST_USE_RESULT
Send message using options for the write.
Definition: call.h:225
virtual void PerformOpsOnCall(CallOpSetInterface *ops, Call *call)=0
void FinishOp(bool *status, int max_message_size)
Definition: call.h:156
bool got_message
Definition: call.h:243
void ClientRecvStatus(ClientContext *context, Status *status)
Definition: call.h:435
void PerformOps(CallOpSetInterface *ops)
void FinishOp(bool *status, int max_message_size)
Definition: call.h:254
void set_max_message_size(int max_message_size)
Definition: call.h:489
DeserializeFuncType(R *message)
Definition: call.h:287
Definition: status.h:42
Definition: call.h:159
virtual Status Deserialize(grpc_byte_buffer *buf, int max_message_size)=0
void AddOp(grpc_op *ops, size_t *nops)
Definition: call.h:412
void RecvMessage(R *message)
Definition: call.h:241
CallOpGenericRecvMessage()
Definition: call.h:300
bool got_message
Definition: call.h:308
A CallOpSet that does not post completions to the completion queue.
Definition: call.h:548
virtual void FillOps(grpc_op *ops, size_t *nops)=0
Fills in grpc_op, starting from ops[*nops] and moving upwards.
void set_output_tag(void *return_tag)
Definition: call.h:535
size_t initial_metadata_count_
Definition: call.h:186
#define GRPC_OVERRIDE
Definition: config.h:77
Definition: call.h:558
void RecvInitialMetadata(ClientContext *context)
Definition: call.h:406
Definition: call.h:402
void AddOp(grpc_op *ops, size_t *nops)
Definition: call.h:204
grpc_metadata * initial_metadata_
Definition: call.h:187
void FinishOp(bool *status, int max_message_size)
Definition: call.h:179
Definition: call.h:298
virtual ~CallHook()
Definition: call.h:560