aboutsummaryrefslogtreecommitdiffhomepage
path: root/include/grpc++
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-03-31 07:50:01 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-03-31 07:50:01 -0700
commit547631eaf89709ed2dcb30e9f8fdb617617c7aff (patch)
tree3ca8547943c909d13d0c89da906d5fb2a78d3595 /include/grpc++
parent4cbf4d803013d658919d08c341747fb005ff6337 (diff)
parent66c76d653bf93fb94b108993f3bea916675147bc (diff)
Merge github.com:grpc/grpc into cpparena
Diffstat (limited to 'include/grpc++')
-rw-r--r--include/grpc++/impl/codegen/async_stream.h247
-rw-r--r--include/grpc++/impl/codegen/call.h48
-rw-r--r--include/grpc++/impl/codegen/client_context.h15
-rw-r--r--include/grpc++/impl/codegen/grpc_library.h26
-rw-r--r--include/grpc++/impl/codegen/server_context.h1
-rw-r--r--include/grpc++/impl/codegen/sync_stream.h106
-rw-r--r--include/grpc++/support/channel_arguments.h6
7 files changed, 349 insertions, 100 deletions
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h
index 1a5cbbd45d..8f529895ca 100644
--- a/include/grpc++/impl/codegen/async_stream.h
+++ b/include/grpc++/impl/codegen/async_stream.h
@@ -101,6 +101,39 @@ class AsyncWriterInterface {
/// \param[in] msg The message to be written.
/// \param[in] tag The tag identifying the operation.
virtual void Write(const W& msg, void* tag) = 0;
+
+ /// Request the writing of \a msg using WriteOptions \a options with
+ /// identifying tag \a tag.
+ ///
+ /// Only one write may be outstanding at any given time. This means that
+ /// after calling Write, one must wait to receive \a tag from the completion
+ /// queue BEFORE calling Write again.
+ /// WriteOptions \a options is used to set the write options of this message.
+ /// This is thread-safe with respect to \a Read
+ ///
+ /// \param[in] msg The message to be written.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ /// \param[in] tag The tag identifying the operation.
+ virtual void Write(const W& msg, WriteOptions options, void* tag) = 0;
+
+ /// Request the writing of \a msg and coalesce it with the writing
+ /// of trailing metadata, using WriteOptions \a options with identifying tag
+ /// \a tag.
+ ///
+ /// For client, WriteLast is equivalent of performing Write and WritesDone in
+ /// a single step.
+ /// For server, WriteLast buffers the \a msg. The writing of \a msg is held
+ /// until Finish is called, where \a msg and trailing metadata are coalesced
+ /// and write is initiated. Note that WriteLast can only buffer \a msg up to
+ /// the flow control window size. If \a msg size is larger than the window
+ /// size, it will be sent on wire without buffering.
+ ///
+ /// \param[in] msg The message to be written.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ /// \param[in] tag The tag identifying the operation.
+ void WriteLast(const W& msg, WriteOptions options, void* tag) {
+ Write(msg, options.set_last_message(), tag);
+ }
};
template <class R>
@@ -183,11 +216,17 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
: context_(context), call_(channel->CreateCall(method, context, cq)) {
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
-
- init_ops_.set_output_tag(tag);
- init_ops_.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&init_ops_);
+ // if corked bit is set in context, we buffer up the initial metadata to
+ // coalesce with later message to be sent. No op is performed.
+ if (context_->initial_metadata_corked_) {
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ } else {
+ write_ops_.set_output_tag(tag);
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&write_ops_);
+ }
}
void ReadInitialMetadata(void* tag) override {
@@ -205,10 +244,21 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
call_.PerformOps(&write_ops_);
}
+ void Write(const W& msg, WriteOptions options, void* tag) override {
+ write_ops_.set_output_tag(tag);
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ write_ops_.ClientSendClose();
+ }
+ // TODO(ctiller): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
void WritesDone(void* tag) override {
- writes_done_ops_.set_output_tag(tag);
- writes_done_ops_.ClientSendClose();
- call_.PerformOps(&writes_done_ops_);
+ write_ops_.set_output_tag(tag);
+ write_ops_.ClientSendClose();
+ call_.PerformOps(&write_ops_);
}
void Finish(Status* status, void* tag) override {
@@ -223,10 +273,9 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
private:
ClientContext* context_;
Call call_;
- CallOpSet<CallOpSendInitialMetadata> init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
- CallOpSet<CallOpSendMessage> write_ops_;
- CallOpSet<CallOpClientSendClose> writes_done_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ write_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
CallOpClientRecvStatus>
finish_ops_;
@@ -253,10 +302,17 @@ class ClientAsyncReaderWriter final
const RpcMethod& method, ClientContext* context,
void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_ops_.set_output_tag(tag);
- init_ops_.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&init_ops_);
+ if (context_->initial_metadata_corked_) {
+ // if corked bit is set in context, we buffer up the initial metadata to
+ // coalesce with later message to be sent. No op is performed.
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ } else {
+ write_ops_.set_output_tag(tag);
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&write_ops_);
+ }
}
void ReadInitialMetadata(void* tag) override {
@@ -283,10 +339,21 @@ class ClientAsyncReaderWriter final
call_.PerformOps(&write_ops_);
}
+ void Write(const W& msg, WriteOptions options, void* tag) override {
+ write_ops_.set_output_tag(tag);
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ write_ops_.ClientSendClose();
+ }
+ // TODO(ctiller): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
void WritesDone(void* tag) override {
- writes_done_ops_.set_output_tag(tag);
- writes_done_ops_.ClientSendClose();
- call_.PerformOps(&writes_done_ops_);
+ write_ops_.set_output_tag(tag);
+ write_ops_.ClientSendClose();
+ call_.PerformOps(&write_ops_);
}
void Finish(Status* status, void* tag) override {
@@ -301,11 +368,10 @@ class ClientAsyncReaderWriter final
private:
ClientContext* context_;
Call call_;
- CallOpSet<CallOpSendInitialMetadata> init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpSendMessage> write_ops_;
- CallOpSet<CallOpClientSendClose> writes_done_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ write_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
};
@@ -395,6 +461,20 @@ class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface,
public AsyncWriterInterface<W> {
public:
virtual void Finish(const Status& status, void* tag) = 0;
+
+ /// Request the writing of \a msg and coalesce it with trailing metadata which
+ /// contains \a status, using WriteOptions options with identifying tag \a
+ /// tag.
+ ///
+ /// WriteAndFinish is equivalent of performing WriteLast and Finish in a
+ /// single step.
+ ///
+ /// \param[in] msg The message to be written.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ /// \param[in] status The Status that server returns to client.
+ /// \param[in] tag The tag identifying the operation.
+ virtual void WriteAndFinish(const W& msg, WriteOptions options,
+ const Status& status, void* tag) = 0;
};
template <class W>
@@ -418,29 +498,37 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
void Write(const W& msg, void* tag) override {
write_ops_.set_output_tag(tag);
- if (!ctx_->sent_initial_metadata_) {
- write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- write_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
+ EnsureInitialMetadataSent(&write_ops_);
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
+ void Write(const W& msg, WriteOptions options, void* tag) override {
+ write_ops_.set_output_tag(tag);
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
+
+ EnsureInitialMetadataSent(&write_ops_);
+ // TODO(ctiller): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
+ void WriteAndFinish(const W& msg, WriteOptions options, const Status& status,
+ void* tag) override {
+ write_ops_.set_output_tag(tag);
+ EnsureInitialMetadataSent(&write_ops_);
+ options.set_buffer_hint();
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&write_ops_);
+ }
+
void Finish(const Status& status, void* tag) override {
finish_ops_.set_output_tag(tag);
- if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- finish_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
+ EnsureInitialMetadataSent(&finish_ops_);
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_ops_);
}
@@ -448,10 +536,24 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
private:
void BindCall(Call* call) override { call_ = *call; }
+ template <class T>
+ void EnsureInitialMetadataSent(T* ops) {
+ if (!ctx_->sent_initial_metadata_) {
+ ops->SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ ops->set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ }
+
Call call_;
ServerContext* ctx_;
CallOpSet<CallOpSendInitialMetadata> meta_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ write_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
@@ -462,6 +564,20 @@ class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface,
public AsyncReaderInterface<R> {
public:
virtual void Finish(const Status& status, void* tag) = 0;
+
+ /// Request the writing of \a msg and coalesce it with trailing metadata which
+ /// contains \a status, using WriteOptions options with identifying tag \a
+ /// tag.
+ ///
+ /// WriteAndFinish is equivalent of performing WriteLast and Finish in a
+ /// single step.
+ ///
+ /// \param[in] msg The message to be written.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ /// \param[in] status The Status that server returns to client.
+ /// \param[in] tag The tag identifying the operation.
+ virtual void WriteAndFinish(const W& msg, WriteOptions options,
+ const Status& status, void* tag) = 0;
};
template <class W, class R>
@@ -492,29 +608,36 @@ class ServerAsyncReaderWriter final
void Write(const W& msg, void* tag) override {
write_ops_.set_output_tag(tag);
- if (!ctx_->sent_initial_metadata_) {
- write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- write_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
+ EnsureInitialMetadataSent(&write_ops_);
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
+ void Write(const W& msg, WriteOptions options, void* tag) override {
+ write_ops_.set_output_tag(tag);
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
+ EnsureInitialMetadataSent(&write_ops_);
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
+ void WriteAndFinish(const W& msg, WriteOptions options, const Status& status,
+ void* tag) override {
+ write_ops_.set_output_tag(tag);
+ EnsureInitialMetadataSent(&write_ops_);
+ options.set_buffer_hint();
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&write_ops_);
+ }
+
void Finish(const Status& status, void* tag) override {
finish_ops_.set_output_tag(tag);
- if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- finish_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
+ EnsureInitialMetadataSent(&finish_ops_);
+
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_ops_);
}
@@ -524,11 +647,25 @@ class ServerAsyncReaderWriter final
void BindCall(Call* call) override { call_ = *call; }
+ template <class T>
+ void EnsureInitialMetadataSent(T* ops) {
+ if (!ctx_->sent_initial_metadata_) {
+ ops->SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ ops->set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ }
+
Call call_;
ServerContext* ctx_;
CallOpSet<CallOpSendInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ write_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index 04296dbe15..9c8611a116 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -86,8 +86,9 @@ inline grpc_metadata* FillMetadataArray(
/// Per-message write options.
class WriteOptions {
public:
- WriteOptions() : flags_(0) {}
- WriteOptions(const WriteOptions& other) : flags_(other.flags_) {}
+ WriteOptions() : flags_(0), last_message_(false) {}
+ WriteOptions(const WriteOptions& other)
+ : flags_(other.flags_), last_message_(other.last_message_) {}
/// Clear all flags.
inline void Clear() { flags_ = 0; }
@@ -143,6 +144,43 @@ class WriteOptions {
/// \sa GRPC_WRITE_BUFFER_HINT
inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
+ /// corked bit: aliases set_buffer_hint currently, with the intent that
+ /// set_buffer_hint will be removed in the future
+ inline WriteOptions& set_corked() {
+ SetBit(GRPC_WRITE_BUFFER_HINT);
+ return *this;
+ }
+
+ inline WriteOptions& clear_corked() {
+ ClearBit(GRPC_WRITE_BUFFER_HINT);
+ return *this;
+ }
+
+ inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
+
+ /// last-message bit: indicates this is the last message in a stream
+ /// client-side: makes Write the equivalent of performing Write, WritesDone
+ /// in a single step
+ /// server-side: hold the Write until the service handler returns (sync api)
+ /// or until Finish is called (async api)
+ inline WriteOptions& set_last_message() {
+ last_message_ = true;
+ return *this;
+ }
+
+ /// Clears flag indicating that this is the last message in a stream,
+ /// disabling coalescing.
+ inline WriteOptions& clear_last_messsage() {
+ last_message_ = false;
+ return *this;
+ }
+
+ /// Get value for the flag indicating that this is the last message, and
+ /// should be coalesced with trailing metadata.
+ ///
+ /// \sa GRPC_WRITE_LAST_MESSAGE
+ bool is_last_message() const { return last_message_; }
+
WriteOptions& operator=(const WriteOptions& rhs) {
flags_ = rhs.flags_;
return *this;
@@ -156,6 +194,7 @@ class WriteOptions {
bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; }
uint32_t flags_;
+ bool last_message_;
};
/// Default argument for CallOpSet. I is unused by the class, but can be
@@ -226,7 +265,7 @@ class CallOpSendMessage {
/// after use.
template <class M>
Status SendMessage(const M& message,
- const WriteOptions& options) GRPC_MUST_USE_RESULT;
+ WriteOptions options) GRPC_MUST_USE_RESULT;
template <class M>
Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
@@ -254,8 +293,7 @@ class CallOpSendMessage {
};
template <class M>
-Status CallOpSendMessage::SendMessage(const M& message,
- const WriteOptions& options) {
+Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) {
write_options_ = options;
return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_);
}
diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h
index b91c7f65d4..3c50e6ba9d 100644
--- a/include/grpc++/impl/codegen/client_context.h
+++ b/include/grpc++/impl/codegen/client_context.h
@@ -281,6 +281,17 @@ class ClientContext {
/// \param algorithm The compression algorithm used for the client call.
void set_compression_algorithm(grpc_compression_algorithm algorithm);
+ /// Flag whether the initial metadata should be \a corked
+ ///
+ /// If \a corked is true, then the initial metadata will be colasced with the
+ /// write of first message in the stream.
+ ///
+ /// \param corked The flag indicating whether the initial metadata is to be
+ /// corked or not.
+ void set_initial_metadata_corked(bool corked) {
+ initial_metadata_corked_ = corked;
+ }
+
/// Return the peer uri in a string.
///
/// \warning This value is never authenticated or subject to any security
@@ -357,7 +368,8 @@ class ClientContext {
(cacheable_ ? GRPC_INITIAL_METADATA_CACHEABLE_REQUEST : 0) |
(wait_for_ready_explicitly_set_
? GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET
- : 0);
+ : 0) |
+ (initial_metadata_corked_ ? GRPC_INITIAL_METADATA_CORKED : 0);
}
grpc::string authority() { return authority_; }
@@ -384,6 +396,7 @@ class ClientContext {
PropagationOptions propagation_options_;
grpc_compression_algorithm compression_algorithm_;
+ bool initial_metadata_corked_;
};
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/grpc_library.h b/include/grpc++/impl/codegen/grpc_library.h
index 2b11aff214..3735d04e8c 100644
--- a/include/grpc++/impl/codegen/grpc_library.h
+++ b/include/grpc++/impl/codegen/grpc_library.h
@@ -51,18 +51,26 @@ extern GrpcLibraryInterface* g_glip;
/// Classes that require gRPC to be initialized should inherit from this class.
class GrpcLibraryCodegen {
public:
- GrpcLibraryCodegen() {
- GPR_CODEGEN_ASSERT(g_glip &&
- "gRPC library not initialized. See "
- "grpc::internal::GrpcLibraryInitializer.");
- g_glip->init();
+ GrpcLibraryCodegen(bool call_grpc_init = true) : grpc_init_called_(false) {
+ if (call_grpc_init) {
+ GPR_CODEGEN_ASSERT(g_glip &&
+ "gRPC library not initialized. See "
+ "grpc::internal::GrpcLibraryInitializer.");
+ g_glip->init();
+ grpc_init_called_ = true;
+ }
}
virtual ~GrpcLibraryCodegen() {
- GPR_CODEGEN_ASSERT(g_glip &&
- "gRPC library not initialized. See "
- "grpc::internal::GrpcLibraryInitializer.");
- g_glip->shutdown();
+ if (grpc_init_called_) {
+ GPR_CODEGEN_ASSERT(g_glip &&
+ "gRPC library not initialized. See "
+ "grpc::internal::GrpcLibraryInitializer.");
+ g_glip->shutdown();
+ }
}
+
+ private:
+ bool grpc_init_called_;
};
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h
index bf9a9b6f1a..91f0be06e7 100644
--- a/include/grpc++/impl/codegen/server_context.h
+++ b/include/grpc++/impl/codegen/server_context.h
@@ -39,7 +39,6 @@
#include <vector>
#include <grpc/impl/codegen/compression_types.h>
-#include <grpc/load_reporting.h>
#include <grpc++/impl/codegen/config.h>
#include <grpc++/impl/codegen/create_auth_context.h>
diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h
index 4d9b074e95..ae3b8e441d 100644
--- a/include/grpc++/impl/codegen/sync_stream.h
+++ b/include/grpc++/impl/codegen/sync_stream.h
@@ -100,22 +100,40 @@ class WriterInterface {
public:
virtual ~WriterInterface() {}
- /// Blocking write \a msg to the stream with options.
+ /// Blocking write \a msg to the stream with WriteOptions \a options.
/// This is thread-safe with respect to \a Read
///
/// \param msg The message to be written to the stream.
- /// \param options Options affecting the write operation.
+ /// \param options The WriteOptions affecting the write operation.
///
/// \return \a true on success, \a false when the stream has been closed.
- virtual bool Write(const W& msg, const WriteOptions& options) = 0;
+ virtual bool Write(const W& msg, WriteOptions options) = 0;
- /// Blocking write \a msg to the stream with default options.
+ /// Blocking write \a msg to the stream with default write options.
/// This is thread-safe with respect to \a Read
///
/// \param msg The message to be written to the stream.
///
/// \return \a true on success, \a false when the stream has been closed.
inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
+
+ /// Write \a msg and coalesce it with the writing of trailing metadata, using
+ /// WriteOptions \a options.
+ ///
+ /// For client, WriteLast is equivalent of performing Write and WritesDone in
+ /// a single step. \a msg and trailing metadata are coalesced and sent on wire
+ /// by calling this function.
+ /// For server, WriteLast buffers the \a msg. The writing of \a msg is held
+ /// until the service handler returns, where \a msg and trailing metadata are
+ /// coalesced and sent on wire. Note that WriteLast can only buffer \a msg up
+ /// to the flow control window size. If \a msg size is larger than the window
+ /// size, it will be sent on wire without buffering.
+ ///
+ /// \param[in] msg The message to be written to the stream.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ void WriteLast(const W& msg, WriteOptions options) {
+ Write(msg, options.set_last_message());
+ }
};
/// Client-side interface for streaming reads of message of type \a R.
@@ -213,11 +231,13 @@ class ClientWriter : public ClientWriterInterface<W> {
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
- CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&ops);
- cq_.Pluck(&ops);
+ if (!context_->initial_metadata_corked_) {
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
+ }
}
void WaitForInitialMetadata() {
@@ -230,11 +250,24 @@ class ClientWriter : public ClientWriterInterface<W> {
}
using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) override {
- CallOpSet<CallOpSendMessage> ops;
+ bool Write(const W& msg, WriteOptions options) override {
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose>
+ ops;
+
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ ops.ClientSendClose();
+ }
+ if (context_->initial_metadata_corked_) {
+ ops.SendInitialMetadata(context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ context_->set_initial_metadata_corked(false);
+ }
if (!ops.SendMessage(msg, options).ok()) {
return false;
}
+
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
@@ -293,11 +326,13 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
- CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&ops);
- cq_.Pluck(&ops);
+ if (!context_->initial_metadata_corked_) {
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
+ }
}
void WaitForInitialMetadata() override {
@@ -325,9 +360,24 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
}
using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) override {
- CallOpSet<CallOpSendMessage> ops;
- if (!ops.SendMessage(msg, options).ok()) return false;
+ bool Write(const W& msg, WriteOptions options) override {
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose>
+ ops;
+
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ ops.ClientSendClose();
+ }
+ if (context_->initial_metadata_corked_) {
+ ops.SendInitialMetadata(context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ context_->set_initial_metadata_corked(false);
+ }
+ if (!ops.SendMessage(msg, options).ok()) {
+ return false;
+ }
+
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
@@ -423,7 +473,10 @@ class ServerWriter final : public ServerWriterInterface<W> {
}
using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) override {
+ bool Write(const W& msg, WriteOptions options) override {
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
@@ -485,7 +538,10 @@ class ServerReaderWriterBody final {
return call_->cq()->Pluck(&ops) && ops.got_message;
}
- bool Write(const W& msg, const WriteOptions& options) {
+ bool Write(const W& msg, WriteOptions options) {
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
@@ -523,7 +579,7 @@ class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> {
bool Read(R* msg) override { return body_.Read(msg); }
using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) override {
+ bool Write(const W& msg, WriteOptions options) override {
return body_.Write(msg, options);
}
@@ -562,8 +618,7 @@ class ServerUnaryStreamer final
}
using WriterInterface<ResponseType>::Write;
- bool Write(const ResponseType& response,
- const WriteOptions& options) override {
+ bool Write(const ResponseType& response, WriteOptions options) override {
if (write_done_ || !read_done_) {
return false;
}
@@ -604,8 +659,7 @@ class ServerSplitStreamer final
}
using WriterInterface<ResponseType>::Write;
- bool Write(const ResponseType& response,
- const WriteOptions& options) override {
+ bool Write(const ResponseType& response, WriteOptions options) override {
return read_done_ && body_.Write(response, options);
}
diff --git a/include/grpc++/support/channel_arguments.h b/include/grpc++/support/channel_arguments.h
index efdf7772ad..61307d6194 100644
--- a/include/grpc++/support/channel_arguments.h
+++ b/include/grpc++/support/channel_arguments.h
@@ -54,7 +54,7 @@ class ResourceQuota;
class ChannelArguments {
public:
ChannelArguments();
- ~ChannelArguments() {}
+ ~ChannelArguments();
ChannelArguments(const ChannelArguments& other);
ChannelArguments& operator=(ChannelArguments other) {
@@ -117,10 +117,10 @@ class ChannelArguments {
/// Return (by value) a c grpc_channel_args structure which points to
/// arguments owned by this ChannelArguments instance
- grpc_channel_args c_channel_args() {
+ grpc_channel_args c_channel_args() const {
grpc_channel_args out;
out.num_args = args_.size();
- out.args = args_.empty() ? NULL : &args_[0];
+ out.args = args_.empty() ? NULL : const_cast<grpc_arg*>(&args_[0]);
return out;
}