aboutsummaryrefslogtreecommitdiffhomepage
path: root/include
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-03-31 07:50:01 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-03-31 07:50:01 -0700
commit547631eaf89709ed2dcb30e9f8fdb617617c7aff (patch)
tree3ca8547943c909d13d0c89da906d5fb2a78d3595 /include
parent4cbf4d803013d658919d08c341747fb005ff6337 (diff)
parent66c76d653bf93fb94b108993f3bea916675147bc (diff)
Merge github.com:grpc/grpc into cpparena
Diffstat (limited to 'include')
-rw-r--r--include/grpc++/impl/codegen/async_stream.h247
-rw-r--r--include/grpc++/impl/codegen/call.h48
-rw-r--r--include/grpc++/impl/codegen/client_context.h15
-rw-r--r--include/grpc++/impl/codegen/grpc_library.h26
-rw-r--r--include/grpc++/impl/codegen/server_context.h1
-rw-r--r--include/grpc++/impl/codegen/sync_stream.h106
-rw-r--r--include/grpc++/support/channel_arguments.h6
-rw-r--r--include/grpc/grpc.h65
-rw-r--r--include/grpc/impl/codegen/atm.h5
-rw-r--r--include/grpc/impl/codegen/atm_gcc_atomic.h5
-rw-r--r--include/grpc/impl/codegen/atm_gcc_sync.h1
-rw-r--r--include/grpc/impl/codegen/grpc_types.h26
-rw-r--r--include/grpc/impl/codegen/port_platform.h8
13 files changed, 450 insertions, 109 deletions
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h
index 1a5cbbd45d..8f529895ca 100644
--- a/include/grpc++/impl/codegen/async_stream.h
+++ b/include/grpc++/impl/codegen/async_stream.h
@@ -101,6 +101,39 @@ class AsyncWriterInterface {
/// \param[in] msg The message to be written.
/// \param[in] tag The tag identifying the operation.
virtual void Write(const W& msg, void* tag) = 0;
+
+ /// Request the writing of \a msg using WriteOptions \a options with
+ /// identifying tag \a tag.
+ ///
+ /// Only one write may be outstanding at any given time. This means that
+ /// after calling Write, one must wait to receive \a tag from the completion
+ /// queue BEFORE calling Write again.
+ /// WriteOptions \a options is used to set the write options of this message.
+ /// This is thread-safe with respect to \a Read
+ ///
+ /// \param[in] msg The message to be written.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ /// \param[in] tag The tag identifying the operation.
+ virtual void Write(const W& msg, WriteOptions options, void* tag) = 0;
+
+ /// Request the writing of \a msg and coalesce it with the writing
+ /// of trailing metadata, using WriteOptions \a options with identifying tag
+ /// \a tag.
+ ///
+ /// For client, WriteLast is equivalent of performing Write and WritesDone in
+ /// a single step.
+ /// For server, WriteLast buffers the \a msg. The writing of \a msg is held
+ /// until Finish is called, where \a msg and trailing metadata are coalesced
+ /// and write is initiated. Note that WriteLast can only buffer \a msg up to
+ /// the flow control window size. If \a msg size is larger than the window
+ /// size, it will be sent on wire without buffering.
+ ///
+ /// \param[in] msg The message to be written.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ /// \param[in] tag The tag identifying the operation.
+ void WriteLast(const W& msg, WriteOptions options, void* tag) {
+ Write(msg, options.set_last_message(), tag);
+ }
};
template <class R>
@@ -183,11 +216,17 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
: context_(context), call_(channel->CreateCall(method, context, cq)) {
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
-
- init_ops_.set_output_tag(tag);
- init_ops_.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&init_ops_);
+ // if corked bit is set in context, we buffer up the initial metadata to
+ // coalesce with later message to be sent. No op is performed.
+ if (context_->initial_metadata_corked_) {
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ } else {
+ write_ops_.set_output_tag(tag);
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&write_ops_);
+ }
}
void ReadInitialMetadata(void* tag) override {
@@ -205,10 +244,21 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
call_.PerformOps(&write_ops_);
}
+ void Write(const W& msg, WriteOptions options, void* tag) override {
+ write_ops_.set_output_tag(tag);
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ write_ops_.ClientSendClose();
+ }
+ // TODO(ctiller): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
void WritesDone(void* tag) override {
- writes_done_ops_.set_output_tag(tag);
- writes_done_ops_.ClientSendClose();
- call_.PerformOps(&writes_done_ops_);
+ write_ops_.set_output_tag(tag);
+ write_ops_.ClientSendClose();
+ call_.PerformOps(&write_ops_);
}
void Finish(Status* status, void* tag) override {
@@ -223,10 +273,9 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
private:
ClientContext* context_;
Call call_;
- CallOpSet<CallOpSendInitialMetadata> init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
- CallOpSet<CallOpSendMessage> write_ops_;
- CallOpSet<CallOpClientSendClose> writes_done_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ write_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
CallOpClientRecvStatus>
finish_ops_;
@@ -253,10 +302,17 @@ class ClientAsyncReaderWriter final
const RpcMethod& method, ClientContext* context,
void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_ops_.set_output_tag(tag);
- init_ops_.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&init_ops_);
+ if (context_->initial_metadata_corked_) {
+ // if corked bit is set in context, we buffer up the initial metadata to
+ // coalesce with later message to be sent. No op is performed.
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ } else {
+ write_ops_.set_output_tag(tag);
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&write_ops_);
+ }
}
void ReadInitialMetadata(void* tag) override {
@@ -283,10 +339,21 @@ class ClientAsyncReaderWriter final
call_.PerformOps(&write_ops_);
}
+ void Write(const W& msg, WriteOptions options, void* tag) override {
+ write_ops_.set_output_tag(tag);
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ write_ops_.ClientSendClose();
+ }
+ // TODO(ctiller): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
void WritesDone(void* tag) override {
- writes_done_ops_.set_output_tag(tag);
- writes_done_ops_.ClientSendClose();
- call_.PerformOps(&writes_done_ops_);
+ write_ops_.set_output_tag(tag);
+ write_ops_.ClientSendClose();
+ call_.PerformOps(&write_ops_);
}
void Finish(Status* status, void* tag) override {
@@ -301,11 +368,10 @@ class ClientAsyncReaderWriter final
private:
ClientContext* context_;
Call call_;
- CallOpSet<CallOpSendInitialMetadata> init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpSendMessage> write_ops_;
- CallOpSet<CallOpClientSendClose> writes_done_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ write_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
};
@@ -395,6 +461,20 @@ class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface,
public AsyncWriterInterface<W> {
public:
virtual void Finish(const Status& status, void* tag) = 0;
+
+ /// Request the writing of \a msg and coalesce it with trailing metadata which
+ /// contains \a status, using WriteOptions options with identifying tag \a
+ /// tag.
+ ///
+ /// WriteAndFinish is equivalent of performing WriteLast and Finish in a
+ /// single step.
+ ///
+ /// \param[in] msg The message to be written.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ /// \param[in] status The Status that server returns to client.
+ /// \param[in] tag The tag identifying the operation.
+ virtual void WriteAndFinish(const W& msg, WriteOptions options,
+ const Status& status, void* tag) = 0;
};
template <class W>
@@ -418,29 +498,37 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
void Write(const W& msg, void* tag) override {
write_ops_.set_output_tag(tag);
- if (!ctx_->sent_initial_metadata_) {
- write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- write_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
+ EnsureInitialMetadataSent(&write_ops_);
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
+ void Write(const W& msg, WriteOptions options, void* tag) override {
+ write_ops_.set_output_tag(tag);
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
+
+ EnsureInitialMetadataSent(&write_ops_);
+ // TODO(ctiller): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
+ void WriteAndFinish(const W& msg, WriteOptions options, const Status& status,
+ void* tag) override {
+ write_ops_.set_output_tag(tag);
+ EnsureInitialMetadataSent(&write_ops_);
+ options.set_buffer_hint();
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&write_ops_);
+ }
+
void Finish(const Status& status, void* tag) override {
finish_ops_.set_output_tag(tag);
- if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- finish_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
+ EnsureInitialMetadataSent(&finish_ops_);
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_ops_);
}
@@ -448,10 +536,24 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
private:
void BindCall(Call* call) override { call_ = *call; }
+ template <class T>
+ void EnsureInitialMetadataSent(T* ops) {
+ if (!ctx_->sent_initial_metadata_) {
+ ops->SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ ops->set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ }
+
Call call_;
ServerContext* ctx_;
CallOpSet<CallOpSendInitialMetadata> meta_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ write_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
@@ -462,6 +564,20 @@ class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface,
public AsyncReaderInterface<R> {
public:
virtual void Finish(const Status& status, void* tag) = 0;
+
+ /// Request the writing of \a msg and coalesce it with trailing metadata which
+ /// contains \a status, using WriteOptions options with identifying tag \a
+ /// tag.
+ ///
+ /// WriteAndFinish is equivalent of performing WriteLast and Finish in a
+ /// single step.
+ ///
+ /// \param[in] msg The message to be written.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ /// \param[in] status The Status that server returns to client.
+ /// \param[in] tag The tag identifying the operation.
+ virtual void WriteAndFinish(const W& msg, WriteOptions options,
+ const Status& status, void* tag) = 0;
};
template <class W, class R>
@@ -492,29 +608,36 @@ class ServerAsyncReaderWriter final
void Write(const W& msg, void* tag) override {
write_ops_.set_output_tag(tag);
- if (!ctx_->sent_initial_metadata_) {
- write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- write_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
+ EnsureInitialMetadataSent(&write_ops_);
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
+ void Write(const W& msg, WriteOptions options, void* tag) override {
+ write_ops_.set_output_tag(tag);
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
+ EnsureInitialMetadataSent(&write_ops_);
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
+ void WriteAndFinish(const W& msg, WriteOptions options, const Status& status,
+ void* tag) override {
+ write_ops_.set_output_tag(tag);
+ EnsureInitialMetadataSent(&write_ops_);
+ options.set_buffer_hint();
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&write_ops_);
+ }
+
void Finish(const Status& status, void* tag) override {
finish_ops_.set_output_tag(tag);
- if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- finish_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
+ EnsureInitialMetadataSent(&finish_ops_);
+
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_ops_);
}
@@ -524,11 +647,25 @@ class ServerAsyncReaderWriter final
void BindCall(Call* call) override { call_ = *call; }
+ template <class T>
+ void EnsureInitialMetadataSent(T* ops) {
+ if (!ctx_->sent_initial_metadata_) {
+ ops->SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ ops->set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ }
+
Call call_;
ServerContext* ctx_;
CallOpSet<CallOpSendInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ write_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index 04296dbe15..9c8611a116 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -86,8 +86,9 @@ inline grpc_metadata* FillMetadataArray(
/// Per-message write options.
class WriteOptions {
public:
- WriteOptions() : flags_(0) {}
- WriteOptions(const WriteOptions& other) : flags_(other.flags_) {}
+ WriteOptions() : flags_(0), last_message_(false) {}
+ WriteOptions(const WriteOptions& other)
+ : flags_(other.flags_), last_message_(other.last_message_) {}
/// Clear all flags.
inline void Clear() { flags_ = 0; }
@@ -143,6 +144,43 @@ class WriteOptions {
/// \sa GRPC_WRITE_BUFFER_HINT
inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
+ /// corked bit: aliases set_buffer_hint currently, with the intent that
+ /// set_buffer_hint will be removed in the future
+ inline WriteOptions& set_corked() {
+ SetBit(GRPC_WRITE_BUFFER_HINT);
+ return *this;
+ }
+
+ inline WriteOptions& clear_corked() {
+ ClearBit(GRPC_WRITE_BUFFER_HINT);
+ return *this;
+ }
+
+ inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
+
+ /// last-message bit: indicates this is the last message in a stream
+ /// client-side: makes Write the equivalent of performing Write, WritesDone
+ /// in a single step
+ /// server-side: hold the Write until the service handler returns (sync api)
+ /// or until Finish is called (async api)
+ inline WriteOptions& set_last_message() {
+ last_message_ = true;
+ return *this;
+ }
+
+ /// Clears flag indicating that this is the last message in a stream,
+ /// disabling coalescing.
+ inline WriteOptions& clear_last_messsage() {
+ last_message_ = false;
+ return *this;
+ }
+
+ /// Get value for the flag indicating that this is the last message, and
+ /// should be coalesced with trailing metadata.
+ ///
+ /// \sa GRPC_WRITE_LAST_MESSAGE
+ bool is_last_message() const { return last_message_; }
+
WriteOptions& operator=(const WriteOptions& rhs) {
flags_ = rhs.flags_;
return *this;
@@ -156,6 +194,7 @@ class WriteOptions {
bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; }
uint32_t flags_;
+ bool last_message_;
};
/// Default argument for CallOpSet. I is unused by the class, but can be
@@ -226,7 +265,7 @@ class CallOpSendMessage {
/// after use.
template <class M>
Status SendMessage(const M& message,
- const WriteOptions& options) GRPC_MUST_USE_RESULT;
+ WriteOptions options) GRPC_MUST_USE_RESULT;
template <class M>
Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
@@ -254,8 +293,7 @@ class CallOpSendMessage {
};
template <class M>
-Status CallOpSendMessage::SendMessage(const M& message,
- const WriteOptions& options) {
+Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) {
write_options_ = options;
return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_);
}
diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h
index b91c7f65d4..3c50e6ba9d 100644
--- a/include/grpc++/impl/codegen/client_context.h
+++ b/include/grpc++/impl/codegen/client_context.h
@@ -281,6 +281,17 @@ class ClientContext {
/// \param algorithm The compression algorithm used for the client call.
void set_compression_algorithm(grpc_compression_algorithm algorithm);
+ /// Flag whether the initial metadata should be \a corked
+ ///
+ /// If \a corked is true, then the initial metadata will be colasced with the
+ /// write of first message in the stream.
+ ///
+ /// \param corked The flag indicating whether the initial metadata is to be
+ /// corked or not.
+ void set_initial_metadata_corked(bool corked) {
+ initial_metadata_corked_ = corked;
+ }
+
/// Return the peer uri in a string.
///
/// \warning This value is never authenticated or subject to any security
@@ -357,7 +368,8 @@ class ClientContext {
(cacheable_ ? GRPC_INITIAL_METADATA_CACHEABLE_REQUEST : 0) |
(wait_for_ready_explicitly_set_
? GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET
- : 0);
+ : 0) |
+ (initial_metadata_corked_ ? GRPC_INITIAL_METADATA_CORKED : 0);
}
grpc::string authority() { return authority_; }
@@ -384,6 +396,7 @@ class ClientContext {
PropagationOptions propagation_options_;
grpc_compression_algorithm compression_algorithm_;
+ bool initial_metadata_corked_;
};
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/grpc_library.h b/include/grpc++/impl/codegen/grpc_library.h
index 2b11aff214..3735d04e8c 100644
--- a/include/grpc++/impl/codegen/grpc_library.h
+++ b/include/grpc++/impl/codegen/grpc_library.h
@@ -51,18 +51,26 @@ extern GrpcLibraryInterface* g_glip;
/// Classes that require gRPC to be initialized should inherit from this class.
class GrpcLibraryCodegen {
public:
- GrpcLibraryCodegen() {
- GPR_CODEGEN_ASSERT(g_glip &&
- "gRPC library not initialized. See "
- "grpc::internal::GrpcLibraryInitializer.");
- g_glip->init();
+ GrpcLibraryCodegen(bool call_grpc_init = true) : grpc_init_called_(false) {
+ if (call_grpc_init) {
+ GPR_CODEGEN_ASSERT(g_glip &&
+ "gRPC library not initialized. See "
+ "grpc::internal::GrpcLibraryInitializer.");
+ g_glip->init();
+ grpc_init_called_ = true;
+ }
}
virtual ~GrpcLibraryCodegen() {
- GPR_CODEGEN_ASSERT(g_glip &&
- "gRPC library not initialized. See "
- "grpc::internal::GrpcLibraryInitializer.");
- g_glip->shutdown();
+ if (grpc_init_called_) {
+ GPR_CODEGEN_ASSERT(g_glip &&
+ "gRPC library not initialized. See "
+ "grpc::internal::GrpcLibraryInitializer.");
+ g_glip->shutdown();
+ }
}
+
+ private:
+ bool grpc_init_called_;
};
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h
index bf9a9b6f1a..91f0be06e7 100644
--- a/include/grpc++/impl/codegen/server_context.h
+++ b/include/grpc++/impl/codegen/server_context.h
@@ -39,7 +39,6 @@
#include <vector>
#include <grpc/impl/codegen/compression_types.h>
-#include <grpc/load_reporting.h>
#include <grpc++/impl/codegen/config.h>
#include <grpc++/impl/codegen/create_auth_context.h>
diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h
index 4d9b074e95..ae3b8e441d 100644
--- a/include/grpc++/impl/codegen/sync_stream.h
+++ b/include/grpc++/impl/codegen/sync_stream.h
@@ -100,22 +100,40 @@ class WriterInterface {
public:
virtual ~WriterInterface() {}
- /// Blocking write \a msg to the stream with options.
+ /// Blocking write \a msg to the stream with WriteOptions \a options.
/// This is thread-safe with respect to \a Read
///
/// \param msg The message to be written to the stream.
- /// \param options Options affecting the write operation.
+ /// \param options The WriteOptions affecting the write operation.
///
/// \return \a true on success, \a false when the stream has been closed.
- virtual bool Write(const W& msg, const WriteOptions& options) = 0;
+ virtual bool Write(const W& msg, WriteOptions options) = 0;
- /// Blocking write \a msg to the stream with default options.
+ /// Blocking write \a msg to the stream with default write options.
/// This is thread-safe with respect to \a Read
///
/// \param msg The message to be written to the stream.
///
/// \return \a true on success, \a false when the stream has been closed.
inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
+
+ /// Write \a msg and coalesce it with the writing of trailing metadata, using
+ /// WriteOptions \a options.
+ ///
+ /// For client, WriteLast is equivalent of performing Write and WritesDone in
+ /// a single step. \a msg and trailing metadata are coalesced and sent on wire
+ /// by calling this function.
+ /// For server, WriteLast buffers the \a msg. The writing of \a msg is held
+ /// until the service handler returns, where \a msg and trailing metadata are
+ /// coalesced and sent on wire. Note that WriteLast can only buffer \a msg up
+ /// to the flow control window size. If \a msg size is larger than the window
+ /// size, it will be sent on wire without buffering.
+ ///
+ /// \param[in] msg The message to be written to the stream.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ void WriteLast(const W& msg, WriteOptions options) {
+ Write(msg, options.set_last_message());
+ }
};
/// Client-side interface for streaming reads of message of type \a R.
@@ -213,11 +231,13 @@ class ClientWriter : public ClientWriterInterface<W> {
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
- CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&ops);
- cq_.Pluck(&ops);
+ if (!context_->initial_metadata_corked_) {
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
+ }
}
void WaitForInitialMetadata() {
@@ -230,11 +250,24 @@ class ClientWriter : public ClientWriterInterface<W> {
}
using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) override {
- CallOpSet<CallOpSendMessage> ops;
+ bool Write(const W& msg, WriteOptions options) override {
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose>
+ ops;
+
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ ops.ClientSendClose();
+ }
+ if (context_->initial_metadata_corked_) {
+ ops.SendInitialMetadata(context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ context_->set_initial_metadata_corked(false);
+ }
if (!ops.SendMessage(msg, options).ok()) {
return false;
}
+
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
@@ -293,11 +326,13 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
- CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&ops);
- cq_.Pluck(&ops);
+ if (!context_->initial_metadata_corked_) {
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
+ }
}
void WaitForInitialMetadata() override {
@@ -325,9 +360,24 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
}
using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) override {
- CallOpSet<CallOpSendMessage> ops;
- if (!ops.SendMessage(msg, options).ok()) return false;
+ bool Write(const W& msg, WriteOptions options) override {
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose>
+ ops;
+
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ ops.ClientSendClose();
+ }
+ if (context_->initial_metadata_corked_) {
+ ops.SendInitialMetadata(context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ context_->set_initial_metadata_corked(false);
+ }
+ if (!ops.SendMessage(msg, options).ok()) {
+ return false;
+ }
+
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
@@ -423,7 +473,10 @@ class ServerWriter final : public ServerWriterInterface<W> {
}
using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) override {
+ bool Write(const W& msg, WriteOptions options) override {
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
@@ -485,7 +538,10 @@ class ServerReaderWriterBody final {
return call_->cq()->Pluck(&ops) && ops.got_message;
}
- bool Write(const W& msg, const WriteOptions& options) {
+ bool Write(const W& msg, WriteOptions options) {
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
@@ -523,7 +579,7 @@ class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> {
bool Read(R* msg) override { return body_.Read(msg); }
using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) override {
+ bool Write(const W& msg, WriteOptions options) override {
return body_.Write(msg, options);
}
@@ -562,8 +618,7 @@ class ServerUnaryStreamer final
}
using WriterInterface<ResponseType>::Write;
- bool Write(const ResponseType& response,
- const WriteOptions& options) override {
+ bool Write(const ResponseType& response, WriteOptions options) override {
if (write_done_ || !read_done_) {
return false;
}
@@ -604,8 +659,7 @@ class ServerSplitStreamer final
}
using WriterInterface<ResponseType>::Write;
- bool Write(const ResponseType& response,
- const WriteOptions& options) override {
+ bool Write(const ResponseType& response, WriteOptions options) override {
return read_done_ && body_.Write(response, options);
}
diff --git a/include/grpc++/support/channel_arguments.h b/include/grpc++/support/channel_arguments.h
index efdf7772ad..61307d6194 100644
--- a/include/grpc++/support/channel_arguments.h
+++ b/include/grpc++/support/channel_arguments.h
@@ -54,7 +54,7 @@ class ResourceQuota;
class ChannelArguments {
public:
ChannelArguments();
- ~ChannelArguments() {}
+ ~ChannelArguments();
ChannelArguments(const ChannelArguments& other);
ChannelArguments& operator=(ChannelArguments other) {
@@ -117,10 +117,10 @@ class ChannelArguments {
/// Return (by value) a c grpc_channel_args structure which points to
/// arguments owned by this ChannelArguments instance
- grpc_channel_args c_channel_args() {
+ grpc_channel_args c_channel_args() const {
grpc_channel_args out;
out.num_args = args_.size();
- out.args = args_.empty() ? NULL : &args_[0];
+ out.args = args_.empty() ? NULL : const_cast<grpc_arg*>(&args_[0]);
return out;
}
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index d603f89c28..3430cd31e9 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -93,6 +93,71 @@ GRPCAPI const char *grpc_version_string(void);
/** Return a string specifying what the 'g' in gRPC stands for */
GRPCAPI const char *grpc_g_stands_for(void);
+/** Specifies the type of APIs to use to pop events from the completion queue */
+typedef enum {
+ /* Events are popped out by calling grpc_completion_queue_next() API ONLY */
+ GRPC_CQ_NEXT = 1,
+
+ /* Events are popped out by calling grpc_completion_queue_pluck() API ONLY */
+ GRPC_CQ_PLUCK
+} grpc_cq_completion_type;
+
+/** Completion queues internally MAY maintain a set of file descriptors in a
+ structure called 'pollset'. This enum specifies if a completion queue has an
+ associated pollset and any restrictions on the type of file descriptors that
+ can be present in the pollset.
+
+ I/O progress can only be made when grpc_completion_queue_next() or
+ grpc_completion_queue_pluck() are called on the completion queue (unless the
+ grpc_cq_polling_type is GRPC_CQ_NON_POLLING) and hence it is very important
+ to actively call these APIs */
+typedef enum {
+ /** The completion queue will have an associated pollset and there is no
+ restriction on the type of file descriptors the pollset may contain */
+ GRPC_CQ_DEFAULT_POLLING,
+
+ /* Similar to GRPC_CQ_DEFAULT_POLLING except that the completion queues will
+ not contain any 'listening file descriptors' (i.e file descriptors used to
+ listen to incoming channels) */
+ GRPC_CQ_NON_LISTENING,
+
+ /* The completion queue will not have an associated pollset. Note that
+ grpc_completion_queue_next() or grpc_completion_queue_pluck() MUST still be
+ called to pop events from the completion queue; it is not required to call
+ them actively to make I/O progress */
+ GRPC_CQ_NON_POLLING
+} grpc_cq_polling_type;
+
+#define GRPC_CQ_CURRENT_VERSION 1
+typedef struct grpc_completion_queue_attributes {
+ /* The version number of this structure. More fields might be added to this
+ structure in future. */
+ int version; /* Set to GRPC_CQ_CURRENT_VERSION */
+
+ grpc_cq_completion_type cq_completion_type;
+
+ grpc_cq_polling_type cq_polling_type;
+} grpc_completion_queue_attributes;
+
+/** The completion queue factory structure is opaque to the callers of grpc */
+typedef struct grpc_completion_queue_factory grpc_completion_queue_factory;
+
+/** Returns the completion queue factory based on the attributes. MAY return a
+ NULL if no factory can be found */
+GRPCAPI const grpc_completion_queue_factory *
+grpc_completion_queue_factory_lookup(
+ const grpc_completion_queue_attributes *attributes);
+
+/** Helper function to create a completion queue with grpc_cq_completion_type
+ of GRPC_CQ_NEXT and grpc_cq_polling_type of GRPC_CQ_DEFAULT_POLLING */
+GRPCAPI grpc_completion_queue *grpc_completion_queue_create_for_next(
+ void *reserved);
+
+/** Helper function to create a completion queue with grpc_cq_completion_type
+ of GRPC_CQ_PLUCK and grpc_cq_polling_type of GRPC_CQ_DEFAULT_POLLING */
+GRPCAPI grpc_completion_queue *grpc_completion_queue_create_for_pluck(
+ void *reserved);
+
/** Create a completion queue */
GRPCAPI grpc_completion_queue *grpc_completion_queue_create(void *reserved);
diff --git a/include/grpc/impl/codegen/atm.h b/include/grpc/impl/codegen/atm.h
index ae00fb0f16..4bd572d6d1 100644
--- a/include/grpc/impl/codegen/atm.h
+++ b/include/grpc/impl/codegen/atm.h
@@ -92,4 +92,9 @@
#error could not determine platform for atm
#endif
+/** Adds \a delta to \a *value, clamping the result to the range specified
+ by \a min and \a max. Returns the new value. */
+gpr_atm gpr_atm_no_barrier_clamped_add(gpr_atm *value, gpr_atm delta,
+ gpr_atm min, gpr_atm max);
+
#endif /* GRPC_IMPL_CODEGEN_ATM_H */
diff --git a/include/grpc/impl/codegen/atm_gcc_atomic.h b/include/grpc/impl/codegen/atm_gcc_atomic.h
index 4bd3b25741..a486258c77 100644
--- a/include/grpc/impl/codegen/atm_gcc_atomic.h
+++ b/include/grpc/impl/codegen/atm_gcc_atomic.h
@@ -85,6 +85,11 @@ static __inline int gpr_atm_rel_cas(gpr_atm *p, gpr_atm o, gpr_atm n) {
p, &o, n, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED));
}
+static __inline int gpr_atm_full_cas(gpr_atm *p, gpr_atm o, gpr_atm n) {
+ return GPR_ATM_INC_CAS_THEN(__atomic_compare_exchange_n(
+ p, &o, n, 0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED));
+}
+
#define gpr_atm_full_xchg(p, n) \
GPR_ATM_INC_CAS_THEN(__atomic_exchange_n((p), (n), __ATOMIC_ACQ_REL))
diff --git a/include/grpc/impl/codegen/atm_gcc_sync.h b/include/grpc/impl/codegen/atm_gcc_sync.h
index 9aa2b43189..946545a671 100644
--- a/include/grpc/impl/codegen/atm_gcc_sync.h
+++ b/include/grpc/impl/codegen/atm_gcc_sync.h
@@ -83,6 +83,7 @@ static __inline void gpr_atm_no_barrier_store(gpr_atm *p, gpr_atm value) {
#define gpr_atm_no_barrier_cas(p, o, n) gpr_atm_acq_cas((p), (o), (n))
#define gpr_atm_acq_cas(p, o, n) (__sync_bool_compare_and_swap((p), (o), (n)))
#define gpr_atm_rel_cas(p, o, n) gpr_atm_acq_cas((p), (o), (n))
+#define gpr_atm_full_cas(p, o, n) gpr_atm_acq_cas((p), (o), (n))
static __inline gpr_atm gpr_atm_full_xchg(gpr_atm *p, gpr_atm n) {
gpr_atm cur;
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index e5c731304c..833ccf3bfe 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -87,6 +87,9 @@ typedef struct grpc_call grpc_call;
/** The Socket Mutator interface allows changes on socket options */
typedef struct grpc_socket_mutator grpc_socket_mutator;
+/** The Socket Factory interface creates and binds sockets */
+typedef struct grpc_socket_factory grpc_socket_factory;
+
/** Type specifier for grpc_arg */
typedef enum {
GRPC_ARG_STRING,
@@ -195,14 +198,14 @@ typedef struct {
#define GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE "grpc.http2.write_buffer_size"
/** After a duration of this time the client pings the server to see if the
transport is still alive. Int valued, seconds. */
-#define GRPC_ARG_HTTP2_KEEPALIVE_TIME "grpc.http2.keepalive_time"
+#define GRPC_ARG_CLIENT_KEEPALIVE_TIME_S "grpc.client_keepalive_time"
/** After waiting for a duration of this time, if the client does not receive
the ping ack, it will close the transport. Int valued, seconds. */
-#define GRPC_ARG_HTTP2_KEEPALIVE_TIMEOUT "grpc.http2.keepalive_timeout"
+#define GRPC_ARG_CLIENT_KEEPALIVE_TIMEOUT_S "grpc.client_keepalive_timeout"
/** Is it permissible to send keepalive pings without any outstanding streams.
Int valued, 0(false)/1(true). */
-#define GRPC_ARG_HTTP2_KEEPALIVE_PERMIT_WITHOUT_CALLS \
- "grpc.http2.keepalive_permit_without_calls"
+#define GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS \
+ "grpc.keepalive_permit_without_calls"
/** Default authority to pass if none specified on call construction. A string.
* */
#define GRPC_ARG_DEFAULT_AUTHORITY "grpc.default_authority"
@@ -240,6 +243,8 @@ typedef struct {
#define GRPC_ARG_LB_POLICY_NAME "grpc.lb_policy_name"
/** The grpc_socket_mutator instance that set the socket options. A pointer. */
#define GRPC_ARG_SOCKET_MUTATOR "grpc.socket_mutator"
+/** The grpc_socket_factory instance to create and bind sockets. A pointer. */
+#define GRPC_ARG_SOCKET_FACTORY "grpc.socket_factory"
/** If non-zero, Cronet transport will coalesce packets to fewer frames when
* possible. */
#define GRPC_ARG_USE_CRONET_PACKET_COALESCING \
@@ -311,13 +316,16 @@ typedef enum grpc_call_error {
/** Signal that GRPC_INITIAL_METADATA_WAIT_FOR_READY was explicitly set
by the calling application. */
#define GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET (0x00000080u)
+/** Signal that the initial metadata should be corked */
+#define GRPC_INITIAL_METADATA_CORKED (0x00000100u)
/** Mask of all valid flags */
-#define GRPC_INITIAL_METADATA_USED_MASK \
- (GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST | \
- GRPC_INITIAL_METADATA_WAIT_FOR_READY | \
- GRPC_INITIAL_METADATA_CACHEABLE_REQUEST | \
- GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)
+#define GRPC_INITIAL_METADATA_USED_MASK \
+ (GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST | \
+ GRPC_INITIAL_METADATA_WAIT_FOR_READY | \
+ GRPC_INITIAL_METADATA_CACHEABLE_REQUEST | \
+ GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET | \
+ GRPC_INITIAL_METADATA_CORKED)
/** A single metadata element */
typedef struct grpc_metadata {
diff --git a/include/grpc/impl/codegen/port_platform.h b/include/grpc/impl/codegen/port_platform.h
index e565cd31d7..086394648f 100644
--- a/include/grpc/impl/codegen/port_platform.h
+++ b/include/grpc/impl/codegen/port_platform.h
@@ -364,6 +364,14 @@ typedef unsigned __int64 uint64_t;
power of two */
#define GPR_MAX_ALIGNMENT 16
+#ifndef GRPC_ARES
+#ifdef GPR_WINDOWS
+#define GRPC_ARES 0
+#else
+#define GRPC_ARES 1
+#endif
+#endif
+
#ifndef GRPC_MUST_USE_RESULT
#if defined(__GNUC__) && !defined(__MINGW32__)
#define GRPC_MUST_USE_RESULT __attribute__((warn_unused_result))