aboutsummaryrefslogtreecommitdiffhomepage
path: root/include
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-03-29 09:07:14 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-03-29 09:07:14 -0700
commitd9bc2bf8252c6a29fb7fcc21be22f7ccd285e619 (patch)
treee992992b0b9995ccad398f7b7db37ff511d4efe4 /include
parent1a7692683fc40579180dee72bb2863a9bca827b6 (diff)
parentd07275764717272c451ace7d7bd10c8d9a51ab6a (diff)
Merge github.com:grpc/grpc into atomic-timers
Diffstat (limited to 'include')
-rw-r--r--include/grpc++/impl/codegen/async_stream.h247
-rw-r--r--include/grpc++/impl/codegen/call.h48
-rw-r--r--include/grpc++/impl/codegen/client_context.h15
-rw-r--r--include/grpc++/impl/codegen/sync_stream.h106
-rw-r--r--include/grpc/impl/codegen/grpc_types.h21
-rw-r--r--include/grpc/impl/codegen/port_platform.h8
6 files changed, 349 insertions, 96 deletions
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h
index 1a5cbbd45d..8f529895ca 100644
--- a/include/grpc++/impl/codegen/async_stream.h
+++ b/include/grpc++/impl/codegen/async_stream.h
@@ -101,6 +101,39 @@ class AsyncWriterInterface {
/// \param[in] msg The message to be written.
/// \param[in] tag The tag identifying the operation.
virtual void Write(const W& msg, void* tag) = 0;
+
+ /// Request the writing of \a msg using WriteOptions \a options with
+ /// identifying tag \a tag.
+ ///
+ /// Only one write may be outstanding at any given time. This means that
+ /// after calling Write, one must wait to receive \a tag from the completion
+ /// queue BEFORE calling Write again.
+ /// WriteOptions \a options is used to set the write options of this message.
+ /// This is thread-safe with respect to \a Read
+ ///
+ /// \param[in] msg The message to be written.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ /// \param[in] tag The tag identifying the operation.
+ virtual void Write(const W& msg, WriteOptions options, void* tag) = 0;
+
+ /// Request the writing of \a msg and coalesce it with the writing
+ /// of trailing metadata, using WriteOptions \a options with identifying tag
+ /// \a tag.
+ ///
+ /// For client, WriteLast is equivalent of performing Write and WritesDone in
+ /// a single step.
+ /// For server, WriteLast buffers the \a msg. The writing of \a msg is held
+ /// until Finish is called, where \a msg and trailing metadata are coalesced
+ /// and write is initiated. Note that WriteLast can only buffer \a msg up to
+ /// the flow control window size. If \a msg size is larger than the window
+ /// size, it will be sent on wire without buffering.
+ ///
+ /// \param[in] msg The message to be written.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ /// \param[in] tag The tag identifying the operation.
+ void WriteLast(const W& msg, WriteOptions options, void* tag) {
+ Write(msg, options.set_last_message(), tag);
+ }
};
template <class R>
@@ -183,11 +216,17 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
: context_(context), call_(channel->CreateCall(method, context, cq)) {
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
-
- init_ops_.set_output_tag(tag);
- init_ops_.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&init_ops_);
+ // if corked bit is set in context, we buffer up the initial metadata to
+ // coalesce with later message to be sent. No op is performed.
+ if (context_->initial_metadata_corked_) {
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ } else {
+ write_ops_.set_output_tag(tag);
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&write_ops_);
+ }
}
void ReadInitialMetadata(void* tag) override {
@@ -205,10 +244,21 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
call_.PerformOps(&write_ops_);
}
+ void Write(const W& msg, WriteOptions options, void* tag) override {
+ write_ops_.set_output_tag(tag);
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ write_ops_.ClientSendClose();
+ }
+ // TODO(ctiller): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
void WritesDone(void* tag) override {
- writes_done_ops_.set_output_tag(tag);
- writes_done_ops_.ClientSendClose();
- call_.PerformOps(&writes_done_ops_);
+ write_ops_.set_output_tag(tag);
+ write_ops_.ClientSendClose();
+ call_.PerformOps(&write_ops_);
}
void Finish(Status* status, void* tag) override {
@@ -223,10 +273,9 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
private:
ClientContext* context_;
Call call_;
- CallOpSet<CallOpSendInitialMetadata> init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
- CallOpSet<CallOpSendMessage> write_ops_;
- CallOpSet<CallOpClientSendClose> writes_done_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ write_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
CallOpClientRecvStatus>
finish_ops_;
@@ -253,10 +302,17 @@ class ClientAsyncReaderWriter final
const RpcMethod& method, ClientContext* context,
void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_ops_.set_output_tag(tag);
- init_ops_.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&init_ops_);
+ if (context_->initial_metadata_corked_) {
+ // if corked bit is set in context, we buffer up the initial metadata to
+ // coalesce with later message to be sent. No op is performed.
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ } else {
+ write_ops_.set_output_tag(tag);
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&write_ops_);
+ }
}
void ReadInitialMetadata(void* tag) override {
@@ -283,10 +339,21 @@ class ClientAsyncReaderWriter final
call_.PerformOps(&write_ops_);
}
+ void Write(const W& msg, WriteOptions options, void* tag) override {
+ write_ops_.set_output_tag(tag);
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ write_ops_.ClientSendClose();
+ }
+ // TODO(ctiller): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
void WritesDone(void* tag) override {
- writes_done_ops_.set_output_tag(tag);
- writes_done_ops_.ClientSendClose();
- call_.PerformOps(&writes_done_ops_);
+ write_ops_.set_output_tag(tag);
+ write_ops_.ClientSendClose();
+ call_.PerformOps(&write_ops_);
}
void Finish(Status* status, void* tag) override {
@@ -301,11 +368,10 @@ class ClientAsyncReaderWriter final
private:
ClientContext* context_;
Call call_;
- CallOpSet<CallOpSendInitialMetadata> init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpSendMessage> write_ops_;
- CallOpSet<CallOpClientSendClose> writes_done_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ write_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
};
@@ -395,6 +461,20 @@ class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface,
public AsyncWriterInterface<W> {
public:
virtual void Finish(const Status& status, void* tag) = 0;
+
+ /// Request the writing of \a msg and coalesce it with trailing metadata which
+ /// contains \a status, using WriteOptions options with identifying tag \a
+ /// tag.
+ ///
+ /// WriteAndFinish is equivalent of performing WriteLast and Finish in a
+ /// single step.
+ ///
+ /// \param[in] msg The message to be written.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ /// \param[in] status The Status that server returns to client.
+ /// \param[in] tag The tag identifying the operation.
+ virtual void WriteAndFinish(const W& msg, WriteOptions options,
+ const Status& status, void* tag) = 0;
};
template <class W>
@@ -418,29 +498,37 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
void Write(const W& msg, void* tag) override {
write_ops_.set_output_tag(tag);
- if (!ctx_->sent_initial_metadata_) {
- write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- write_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
+ EnsureInitialMetadataSent(&write_ops_);
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
+ void Write(const W& msg, WriteOptions options, void* tag) override {
+ write_ops_.set_output_tag(tag);
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
+
+ EnsureInitialMetadataSent(&write_ops_);
+ // TODO(ctiller): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
+ void WriteAndFinish(const W& msg, WriteOptions options, const Status& status,
+ void* tag) override {
+ write_ops_.set_output_tag(tag);
+ EnsureInitialMetadataSent(&write_ops_);
+ options.set_buffer_hint();
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&write_ops_);
+ }
+
void Finish(const Status& status, void* tag) override {
finish_ops_.set_output_tag(tag);
- if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- finish_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
+ EnsureInitialMetadataSent(&finish_ops_);
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_ops_);
}
@@ -448,10 +536,24 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
private:
void BindCall(Call* call) override { call_ = *call; }
+ template <class T>
+ void EnsureInitialMetadataSent(T* ops) {
+ if (!ctx_->sent_initial_metadata_) {
+ ops->SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ ops->set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ }
+
Call call_;
ServerContext* ctx_;
CallOpSet<CallOpSendInitialMetadata> meta_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ write_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
@@ -462,6 +564,20 @@ class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface,
public AsyncReaderInterface<R> {
public:
virtual void Finish(const Status& status, void* tag) = 0;
+
+ /// Request the writing of \a msg and coalesce it with trailing metadata which
+ /// contains \a status, using WriteOptions options with identifying tag \a
+ /// tag.
+ ///
+ /// WriteAndFinish is equivalent of performing WriteLast and Finish in a
+ /// single step.
+ ///
+ /// \param[in] msg The message to be written.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ /// \param[in] status The Status that server returns to client.
+ /// \param[in] tag The tag identifying the operation.
+ virtual void WriteAndFinish(const W& msg, WriteOptions options,
+ const Status& status, void* tag) = 0;
};
template <class W, class R>
@@ -492,29 +608,36 @@ class ServerAsyncReaderWriter final
void Write(const W& msg, void* tag) override {
write_ops_.set_output_tag(tag);
- if (!ctx_->sent_initial_metadata_) {
- write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- write_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
+ EnsureInitialMetadataSent(&write_ops_);
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
+ void Write(const W& msg, WriteOptions options, void* tag) override {
+ write_ops_.set_output_tag(tag);
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
+ EnsureInitialMetadataSent(&write_ops_);
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
+ void WriteAndFinish(const W& msg, WriteOptions options, const Status& status,
+ void* tag) override {
+ write_ops_.set_output_tag(tag);
+ EnsureInitialMetadataSent(&write_ops_);
+ options.set_buffer_hint();
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&write_ops_);
+ }
+
void Finish(const Status& status, void* tag) override {
finish_ops_.set_output_tag(tag);
- if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- finish_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
+ EnsureInitialMetadataSent(&finish_ops_);
+
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_ops_);
}
@@ -524,11 +647,25 @@ class ServerAsyncReaderWriter final
void BindCall(Call* call) override { call_ = *call; }
+ template <class T>
+ void EnsureInitialMetadataSent(T* ops) {
+ if (!ctx_->sent_initial_metadata_) {
+ ops->SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ ops->set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ }
+
Call call_;
ServerContext* ctx_;
CallOpSet<CallOpSendInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ write_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index 19a5ca2b2e..a3f2be6bb1 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -84,8 +84,9 @@ inline grpc_metadata* FillMetadataArray(
/// Per-message write options.
class WriteOptions {
public:
- WriteOptions() : flags_(0) {}
- WriteOptions(const WriteOptions& other) : flags_(other.flags_) {}
+ WriteOptions() : flags_(0), last_message_(false) {}
+ WriteOptions(const WriteOptions& other)
+ : flags_(other.flags_), last_message_(other.last_message_) {}
/// Clear all flags.
inline void Clear() { flags_ = 0; }
@@ -141,6 +142,43 @@ class WriteOptions {
/// \sa GRPC_WRITE_BUFFER_HINT
inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
+ /// corked bit: aliases set_buffer_hint currently, with the intent that
+ /// set_buffer_hint will be removed in the future
+ inline WriteOptions& set_corked() {
+ SetBit(GRPC_WRITE_BUFFER_HINT);
+ return *this;
+ }
+
+ inline WriteOptions& clear_corked() {
+ ClearBit(GRPC_WRITE_BUFFER_HINT);
+ return *this;
+ }
+
+ inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
+
+ /// last-message bit: indicates this is the last message in a stream
+ /// client-side: makes Write the equivalent of performing Write, WritesDone
+ /// in a single step
+ /// server-side: hold the Write until the service handler returns (sync api)
+ /// or until Finish is called (async api)
+ inline WriteOptions& set_last_message() {
+ last_message_ = true;
+ return *this;
+ }
+
+ /// Clears flag indicating that this is the last message in a stream,
+ /// disabling coalescing.
+ inline WriteOptions& clear_last_messsage() {
+ last_message_ = false;
+ return *this;
+ }
+
+ /// Get value for the flag indicating that this is the last message, and
+ /// should be coalesced with trailing metadata.
+ ///
+ /// \sa GRPC_WRITE_LAST_MESSAGE
+ bool is_last_message() const { return last_message_; }
+
WriteOptions& operator=(const WriteOptions& rhs) {
flags_ = rhs.flags_;
return *this;
@@ -154,6 +192,7 @@ class WriteOptions {
bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; }
uint32_t flags_;
+ bool last_message_;
};
/// Default argument for CallOpSet. I is unused by the class, but can be
@@ -224,7 +263,7 @@ class CallOpSendMessage {
/// after use.
template <class M>
Status SendMessage(const M& message,
- const WriteOptions& options) GRPC_MUST_USE_RESULT;
+ WriteOptions options) GRPC_MUST_USE_RESULT;
template <class M>
Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
@@ -252,8 +291,7 @@ class CallOpSendMessage {
};
template <class M>
-Status CallOpSendMessage::SendMessage(const M& message,
- const WriteOptions& options) {
+Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) {
write_options_ = options;
return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_);
}
diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h
index b91c7f65d4..3c50e6ba9d 100644
--- a/include/grpc++/impl/codegen/client_context.h
+++ b/include/grpc++/impl/codegen/client_context.h
@@ -281,6 +281,17 @@ class ClientContext {
/// \param algorithm The compression algorithm used for the client call.
void set_compression_algorithm(grpc_compression_algorithm algorithm);
+ /// Flag whether the initial metadata should be \a corked
+ ///
+ /// If \a corked is true, then the initial metadata will be colasced with the
+ /// write of first message in the stream.
+ ///
+ /// \param corked The flag indicating whether the initial metadata is to be
+ /// corked or not.
+ void set_initial_metadata_corked(bool corked) {
+ initial_metadata_corked_ = corked;
+ }
+
/// Return the peer uri in a string.
///
/// \warning This value is never authenticated or subject to any security
@@ -357,7 +368,8 @@ class ClientContext {
(cacheable_ ? GRPC_INITIAL_METADATA_CACHEABLE_REQUEST : 0) |
(wait_for_ready_explicitly_set_
? GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET
- : 0);
+ : 0) |
+ (initial_metadata_corked_ ? GRPC_INITIAL_METADATA_CORKED : 0);
}
grpc::string authority() { return authority_; }
@@ -384,6 +396,7 @@ class ClientContext {
PropagationOptions propagation_options_;
grpc_compression_algorithm compression_algorithm_;
+ bool initial_metadata_corked_;
};
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h
index 4d9b074e95..ae3b8e441d 100644
--- a/include/grpc++/impl/codegen/sync_stream.h
+++ b/include/grpc++/impl/codegen/sync_stream.h
@@ -100,22 +100,40 @@ class WriterInterface {
public:
virtual ~WriterInterface() {}
- /// Blocking write \a msg to the stream with options.
+ /// Blocking write \a msg to the stream with WriteOptions \a options.
/// This is thread-safe with respect to \a Read
///
/// \param msg The message to be written to the stream.
- /// \param options Options affecting the write operation.
+ /// \param options The WriteOptions affecting the write operation.
///
/// \return \a true on success, \a false when the stream has been closed.
- virtual bool Write(const W& msg, const WriteOptions& options) = 0;
+ virtual bool Write(const W& msg, WriteOptions options) = 0;
- /// Blocking write \a msg to the stream with default options.
+ /// Blocking write \a msg to the stream with default write options.
/// This is thread-safe with respect to \a Read
///
/// \param msg The message to be written to the stream.
///
/// \return \a true on success, \a false when the stream has been closed.
inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
+
+ /// Write \a msg and coalesce it with the writing of trailing metadata, using
+ /// WriteOptions \a options.
+ ///
+ /// For client, WriteLast is equivalent of performing Write and WritesDone in
+ /// a single step. \a msg and trailing metadata are coalesced and sent on wire
+ /// by calling this function.
+ /// For server, WriteLast buffers the \a msg. The writing of \a msg is held
+ /// until the service handler returns, where \a msg and trailing metadata are
+ /// coalesced and sent on wire. Note that WriteLast can only buffer \a msg up
+ /// to the flow control window size. If \a msg size is larger than the window
+ /// size, it will be sent on wire without buffering.
+ ///
+ /// \param[in] msg The message to be written to the stream.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ void WriteLast(const W& msg, WriteOptions options) {
+ Write(msg, options.set_last_message());
+ }
};
/// Client-side interface for streaming reads of message of type \a R.
@@ -213,11 +231,13 @@ class ClientWriter : public ClientWriterInterface<W> {
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
- CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&ops);
- cq_.Pluck(&ops);
+ if (!context_->initial_metadata_corked_) {
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
+ }
}
void WaitForInitialMetadata() {
@@ -230,11 +250,24 @@ class ClientWriter : public ClientWriterInterface<W> {
}
using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) override {
- CallOpSet<CallOpSendMessage> ops;
+ bool Write(const W& msg, WriteOptions options) override {
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose>
+ ops;
+
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ ops.ClientSendClose();
+ }
+ if (context_->initial_metadata_corked_) {
+ ops.SendInitialMetadata(context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ context_->set_initial_metadata_corked(false);
+ }
if (!ops.SendMessage(msg, options).ok()) {
return false;
}
+
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
@@ -293,11 +326,13 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
- CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&ops);
- cq_.Pluck(&ops);
+ if (!context_->initial_metadata_corked_) {
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
+ }
}
void WaitForInitialMetadata() override {
@@ -325,9 +360,24 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
}
using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) override {
- CallOpSet<CallOpSendMessage> ops;
- if (!ops.SendMessage(msg, options).ok()) return false;
+ bool Write(const W& msg, WriteOptions options) override {
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose>
+ ops;
+
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ ops.ClientSendClose();
+ }
+ if (context_->initial_metadata_corked_) {
+ ops.SendInitialMetadata(context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ context_->set_initial_metadata_corked(false);
+ }
+ if (!ops.SendMessage(msg, options).ok()) {
+ return false;
+ }
+
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
@@ -423,7 +473,10 @@ class ServerWriter final : public ServerWriterInterface<W> {
}
using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) override {
+ bool Write(const W& msg, WriteOptions options) override {
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
@@ -485,7 +538,10 @@ class ServerReaderWriterBody final {
return call_->cq()->Pluck(&ops) && ops.got_message;
}
- bool Write(const W& msg, const WriteOptions& options) {
+ bool Write(const W& msg, WriteOptions options) {
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
@@ -523,7 +579,7 @@ class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> {
bool Read(R* msg) override { return body_.Read(msg); }
using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) override {
+ bool Write(const W& msg, WriteOptions options) override {
return body_.Write(msg, options);
}
@@ -562,8 +618,7 @@ class ServerUnaryStreamer final
}
using WriterInterface<ResponseType>::Write;
- bool Write(const ResponseType& response,
- const WriteOptions& options) override {
+ bool Write(const ResponseType& response, WriteOptions options) override {
if (write_done_ || !read_done_) {
return false;
}
@@ -604,8 +659,7 @@ class ServerSplitStreamer final
}
using WriterInterface<ResponseType>::Write;
- bool Write(const ResponseType& response,
- const WriteOptions& options) override {
+ bool Write(const ResponseType& response, WriteOptions options) override {
return read_done_ && body_.Write(response, options);
}
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index 887c176f1a..833ccf3bfe 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -198,14 +198,14 @@ typedef struct {
#define GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE "grpc.http2.write_buffer_size"
/** After a duration of this time the client pings the server to see if the
transport is still alive. Int valued, seconds. */
-#define GRPC_ARG_HTTP2_KEEPALIVE_TIME "grpc.http2.keepalive_time"
+#define GRPC_ARG_CLIENT_KEEPALIVE_TIME_S "grpc.client_keepalive_time"
/** After waiting for a duration of this time, if the client does not receive
the ping ack, it will close the transport. Int valued, seconds. */
-#define GRPC_ARG_HTTP2_KEEPALIVE_TIMEOUT "grpc.http2.keepalive_timeout"
+#define GRPC_ARG_CLIENT_KEEPALIVE_TIMEOUT_S "grpc.client_keepalive_timeout"
/** Is it permissible to send keepalive pings without any outstanding streams.
Int valued, 0(false)/1(true). */
-#define GRPC_ARG_HTTP2_KEEPALIVE_PERMIT_WITHOUT_CALLS \
- "grpc.http2.keepalive_permit_without_calls"
+#define GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS \
+ "grpc.keepalive_permit_without_calls"
/** Default authority to pass if none specified on call construction. A string.
* */
#define GRPC_ARG_DEFAULT_AUTHORITY "grpc.default_authority"
@@ -316,13 +316,16 @@ typedef enum grpc_call_error {
/** Signal that GRPC_INITIAL_METADATA_WAIT_FOR_READY was explicitly set
by the calling application. */
#define GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET (0x00000080u)
+/** Signal that the initial metadata should be corked */
+#define GRPC_INITIAL_METADATA_CORKED (0x00000100u)
/** Mask of all valid flags */
-#define GRPC_INITIAL_METADATA_USED_MASK \
- (GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST | \
- GRPC_INITIAL_METADATA_WAIT_FOR_READY | \
- GRPC_INITIAL_METADATA_CACHEABLE_REQUEST | \
- GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)
+#define GRPC_INITIAL_METADATA_USED_MASK \
+ (GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST | \
+ GRPC_INITIAL_METADATA_WAIT_FOR_READY | \
+ GRPC_INITIAL_METADATA_CACHEABLE_REQUEST | \
+ GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET | \
+ GRPC_INITIAL_METADATA_CORKED)
/** A single metadata element */
typedef struct grpc_metadata {
diff --git a/include/grpc/impl/codegen/port_platform.h b/include/grpc/impl/codegen/port_platform.h
index 3d490db1a5..d525083cd0 100644
--- a/include/grpc/impl/codegen/port_platform.h
+++ b/include/grpc/impl/codegen/port_platform.h
@@ -364,6 +364,14 @@ typedef unsigned __int64 uint64_t;
power of two */
#define GPR_MAX_ALIGNMENT 16
+#ifndef GRPC_ARES
+#ifdef GPR_WINDOWS
+#define GRPC_ARES 0
+#else
+#define GRPC_ARES 1
+#endif
+#endif
+
#ifndef GRPC_MUST_USE_RESULT
#if defined(__GNUC__) && !defined(__MINGW32__)
#define GRPC_MUST_USE_RESULT __attribute__((warn_unused_result))