aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpc++/impl/codegen/async_stream.h36
-rw-r--r--include/grpc++/impl/codegen/async_unary_call.h12
-rw-r--r--include/grpc++/impl/codegen/call.h7
-rw-r--r--include/grpc++/impl/codegen/client_context.h13
-rw-r--r--include/grpc++/impl/codegen/client_unary_call.h3
-rw-r--r--include/grpc++/impl/codegen/method_handler_impl.h15
-rw-r--r--include/grpc++/impl/codegen/server_context.h2
-rw-r--r--include/grpc++/impl/codegen/sync_stream.h24
-rw-r--r--include/grpc/impl/codegen/grpc_types.h6
-rw-r--r--src/core/ext/client_config/client_channel.c40
-rw-r--r--src/core/ext/client_config/lb_policy.c11
-rw-r--r--src/core/ext/client_config/lb_policy.h13
-rw-r--r--src/core/ext/client_config/subchannel.c2
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c48
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c40
-rw-r--r--src/core/lib/channel/http_client_filter.c10
-rw-r--r--src/core/lib/channel/subchannel_call_holder.c5
-rw-r--r--src/core/lib/channel/subchannel_call_holder.h1
-rw-r--r--src/core/lib/surface/call.c10
-rw-r--r--src/core/lib/transport/transport.h6
-rw-r--r--src/cpp/client/client_context.cc2
-rw-r--r--test/core/bad_ssl/bad_ssl_test.c2
-rw-r--r--test/core/end2end/dualstack_socket_test.c4
-rw-r--r--test/core/end2end/tests/simple_delayed_request.c2
-rw-r--r--test/cpp/end2end/client_crash_test.cc2
-rw-r--r--test/cpp/end2end/server_crash_test_client.cc1
26 files changed, 237 insertions, 80 deletions
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h
index 78fb7274e2..a607a47106 100644
--- a/include/grpc++/impl/codegen/async_stream.h
+++ b/include/grpc++/impl/codegen/async_stream.h
@@ -108,7 +108,8 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
const W& request, void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
init_ops_.set_output_tag(tag);
- init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+ init_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok());
init_ops_.ClientSendClose();
@@ -173,7 +174,8 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
finish_ops_.RecvMessage(response);
init_ops_.set_output_tag(tag);
- init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+ init_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
call_.PerformOps(&init_ops_);
}
@@ -240,7 +242,8 @@ class ClientAsyncReaderWriter GRPC_FINAL
void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
init_ops_.set_output_tag(tag);
- init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+ init_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
call_.PerformOps(&init_ops_);
}
@@ -305,7 +308,8 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_ops_.set_output_tag(tag);
- meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+ meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
@@ -319,7 +323,8 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
void Finish(const W& msg, const Status& status, void* tag) {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+ finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
@@ -336,7 +341,8 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
GPR_CODEGEN_ASSERT(!status.ok());
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+ finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
@@ -366,7 +372,8 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_ops_.set_output_tag(tag);
- meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+ meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
@@ -374,7 +381,8 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+ write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
@@ -385,7 +393,8 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void Finish(const Status& status, void* tag) {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+ finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
@@ -415,7 +424,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_ops_.set_output_tag(tag);
- meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+ meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
@@ -429,7 +439,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+ write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
@@ -440,7 +451,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void Finish(const Status& status, void* tag) {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+ finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h
index 52b34770a8..55c9788fbd 100644
--- a/include/grpc++/impl/codegen/async_unary_call.h
+++ b/include/grpc++/impl/codegen/async_unary_call.h
@@ -67,7 +67,8 @@ class ClientAsyncResponseReader GRPC_FINAL
call_(channel->CreateCall(method, context, cq)),
collection_(new CallOpSetCollection) {
collection_->init_buf_.SetCollection(collection_);
- collection_->init_buf_.SendInitialMetadata(context->send_initial_metadata_);
+ collection_->init_buf_.SendInitialMetadata(
+ context->send_initial_metadata_, context->initial_metadata_flags());
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(collection_->init_buf_.SendMessage(request).ok());
collection_->init_buf_.ClientSendClose();
@@ -122,7 +123,8 @@ class ServerAsyncResponseWriter GRPC_FINAL
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.set_output_tag(tag);
- meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
+ meta_buf_.SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}
@@ -130,7 +132,8 @@ class ServerAsyncResponseWriter GRPC_FINAL
void Finish(const W& msg, const Status& status, void* tag) {
finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
+ finish_buf_.SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
@@ -147,7 +150,8 @@ class ServerAsyncResponseWriter GRPC_FINAL
GPR_CODEGEN_ASSERT(!status.ok());
finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
+ finish_buf_.SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index f76d7c23ed..aea1a6acec 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -181,8 +181,10 @@ class CallOpSendInitialMetadata {
CallOpSendInitialMetadata() : send_(false) {}
void SendInitialMetadata(
- const std::multimap<grpc::string, grpc::string>& metadata) {
+ const std::multimap<grpc::string, grpc::string>& metadata,
+ uint32_t flags) {
send_ = true;
+ flags_ = flags;
initial_metadata_count_ = metadata.size();
initial_metadata_ = FillMetadataArray(metadata);
}
@@ -192,7 +194,7 @@ class CallOpSendInitialMetadata {
if (!send_) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_INITIAL_METADATA;
- op->flags = 0;
+ op->flags = flags_;
op->reserved = NULL;
op->data.send_initial_metadata.count = initial_metadata_count_;
op->data.send_initial_metadata.metadata = initial_metadata_;
@@ -204,6 +206,7 @@ class CallOpSendInitialMetadata {
}
bool send_;
+ uint32_t flags_;
size_t initial_metadata_count_;
grpc_metadata* initial_metadata_;
};
diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h
index 5201808600..aed12767a7 100644
--- a/include/grpc++/impl/codegen/client_context.h
+++ b/include/grpc++/impl/codegen/client_context.h
@@ -221,6 +221,12 @@ class ClientContext {
deadline_ = deadline_tp.raw_time();
}
+ /// EXPERIMENTAL: Set this request to be idempotent
+ void set_idempotent(bool idempotent) { idempotent_ = idempotent; }
+
+ /// EXPERIMENTAL: Trigger fail-fast or not on this request
+ void set_fail_fast(bool fail_fast) { fail_fast_ = fail_fast; }
+
#ifndef GRPC_CXX0X_NO_CHRONO
/// Return the deadline for the client call.
std::chrono::system_clock::time_point deadline() {
@@ -328,9 +334,16 @@ class ClientContext {
grpc_call* call() { return call_; }
void set_call(grpc_call* call, const std::shared_ptr<Channel>& channel);
+ uint32_t initial_metadata_flags() const {
+ return (idempotent_ ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST : 0) |
+ (fail_fast_ ? 0 : GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY);
+ }
+
grpc::string authority() { return authority_; }
bool initial_metadata_received_;
+ bool fail_fast_;
+ bool idempotent_;
std::shared_ptr<Channel> channel_;
grpc::mutex mu_;
grpc_call* call_;
diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h
index 0ee5b198d2..70d65549c8 100644
--- a/include/grpc++/impl/codegen/client_unary_call.h
+++ b/include/grpc++/impl/codegen/client_unary_call.h
@@ -62,7 +62,8 @@ Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
if (!status.ok()) {
return status;
}
- ops.SendInitialMetadata(context->send_initial_metadata_);
+ ops.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
ops.RecvInitialMetadata(context);
ops.RecvMessage(result);
ops.ClientSendClose();
diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h
index 436b4d819b..ad74efabc4 100644
--- a/include/grpc++/impl/codegen/method_handler_impl.h
+++ b/include/grpc++/impl/codegen/method_handler_impl.h
@@ -63,7 +63,8 @@ class RpcMethodHandler : public MethodHandler {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus>
ops;
- ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ ops.SendInitialMetadata(param.server_context->initial_metadata_,
+ param.server_context->initial_metadata_flags());
if (status.ok()) {
status = ops.SendMessage(rsp);
}
@@ -100,7 +101,8 @@ class ClientStreamingHandler : public MethodHandler {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus>
ops;
- ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ ops.SendInitialMetadata(param.server_context->initial_metadata_,
+ param.server_context->initial_metadata_flags());
if (status.ok()) {
status = ops.SendMessage(rsp);
}
@@ -138,7 +140,8 @@ class ServerStreamingHandler : public MethodHandler {
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
- ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ ops.SendInitialMetadata(param.server_context->initial_metadata_,
+ param.server_context->initial_metadata_flags());
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
@@ -170,7 +173,8 @@ class BidiStreamingHandler : public MethodHandler {
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
- ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ ops.SendInitialMetadata(param.server_context->initial_metadata_,
+ param.server_context->initial_metadata_flags());
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
@@ -191,7 +195,8 @@ class UnknownMethodHandler : public MethodHandler {
static void FillOps(ServerContext* context, T* ops) {
Status status(StatusCode::UNIMPLEMENTED, "");
if (!context->sent_initial_metadata_) {
- ops->SendInitialMetadata(context->initial_metadata_);
+ ops->SendInitialMetadata(context->initial_metadata_,
+ context->initial_metadata_flags());
context->sent_initial_metadata_ = true;
}
ops->ServerSendStatus(context->trailing_metadata_, status);
diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h
index 44d340aa45..7fa0235ca9 100644
--- a/include/grpc++/impl/codegen/server_context.h
+++ b/include/grpc++/impl/codegen/server_context.h
@@ -195,6 +195,8 @@ class ServerContext {
void set_call(grpc_call* call);
+ uint32_t initial_metadata_flags() const { return 0; }
+
CompletionOp* completion_op_;
bool has_notify_when_done_tag_;
void* async_notify_when_done_tag_;
diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h
index d0ad05169f..9100ce09a2 100644
--- a/include/grpc++/impl/codegen/sync_stream.h
+++ b/include/grpc++/impl/codegen/sync_stream.h
@@ -125,7 +125,8 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose>
ops;
- ops.SendInitialMetadata(context->send_initial_metadata_);
+ ops.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok());
ops.ClientSendClose();
@@ -190,7 +191,8 @@ class ClientWriter : public ClientWriterInterface<W> {
finish_ops_.RecvMessage(response);
CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(context->send_initial_metadata_);
+ ops.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
@@ -268,7 +270,8 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
ClientContext* context)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(context->send_initial_metadata_);
+ ops.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
@@ -334,7 +337,8 @@ class ServerReader GRPC_FINAL : public ReaderInterface<R> {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(ctx_->initial_metadata_);
+ ops.SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
@@ -361,7 +365,8 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(ctx_->initial_metadata_);
+ ops.SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
@@ -374,7 +379,8 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
return false;
}
if (!ctx_->sent_initial_metadata_) {
- ops.SendInitialMetadata(ctx_->initial_metadata_);
+ ops.SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);
@@ -397,7 +403,8 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(ctx_->initial_metadata_);
+ ops.SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
@@ -417,7 +424,8 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
return false;
}
if (!ctx_->sent_initial_metadata_) {
- ops.SendInitialMetadata(ctx_->initial_metadata_);
+ ops.SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index b09b1cdf44..e3e882c8b3 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -202,8 +202,12 @@ typedef enum grpc_call_error {
/* Initial metadata flags */
/** Signal that the call is idempotent */
#define GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST (0x00000010u)
+/** Signal that the call should not return UNAVAILABLE before it has started */
+#define GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY (0x00000020u)
/** Mask of all valid flags */
-#define GRPC_INITIAL_METADATA_USED_MASK GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
+#define GRPC_INITIAL_METADATA_USED_MASK \
+ (GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST | \
+ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY)
/** A single metadata element */
typedef struct grpc_metadata {
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 0ff67a7d87..3b2f7b8785 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -114,6 +114,22 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state);
+static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
+ channel_data *chand,
+ grpc_connectivity_state state,
+ const char *reason) {
+ if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
+ state == GRPC_CHANNEL_FATAL_FAILURE) &&
+ chand->lb_policy != NULL) {
+ /* cancel fail-fast picks */
+ grpc_lb_policy_cancel_picks(
+ exec_ctx, chand->lb_policy,
+ /* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
+ /* check= */ 0);
+ }
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, reason);
+}
+
static void on_lb_policy_state_changed_locked(
grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
grpc_connectivity_state publish_state = w->state;
@@ -127,8 +143,8 @@ static void on_lb_policy_state_changed_locked(
GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
w->chand->lb_policy = NULL;
}
- grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, publish_state,
- "lb_changed");
+ set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
+ "lb_changed");
if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
}
@@ -200,8 +216,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
if (iomgr_success && chand->resolver) {
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state,
- "new_lb+resolver");
+ set_channel_connectivity_state_locked(exec_ctx, chand, state,
+ "new_lb+resolver");
if (lb_policy != NULL) {
watch_lb_policy(exec_ctx, chand, lb_policy, state);
}
@@ -216,8 +232,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
}
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
+ set_channel_connectivity_state_locked(
+ exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
gpr_mu_unlock(&chand->mu_config);
}
@@ -272,8 +288,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
}
if (op->disconnect && chand->resolver != NULL) {
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
+ set_channel_connectivity_state_locked(
+ exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
grpc_resolver_shutdown(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
@@ -290,6 +306,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
typedef struct {
grpc_metadata_batch *initial_metadata;
+ uint32_t initial_metadata_flags;
grpc_connected_subchannel **connected_subchannel;
grpc_closure *on_ready;
grpc_call_element *elem;
@@ -298,6 +315,7 @@ typedef struct {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready);
@@ -308,6 +326,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
} else if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
} else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
+ cpa->initial_metadata_flags,
cpa->connected_subchannel, cpa->on_ready)) {
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, true, NULL);
}
@@ -316,6 +335,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready) {
grpc_call_element *elem = elemp;
@@ -349,7 +369,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel");
gpr_mu_unlock(&chand->mu_config);
r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollset,
- initial_metadata, connected_subchannel, on_ready);
+ initial_metadata, initial_metadata_flags,
+ connected_subchannel, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel");
return r;
}
@@ -362,6 +383,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
}
cpa = gpr_malloc(sizeof(*cpa));
cpa->initial_metadata = initial_metadata;
+ cpa->initial_metadata_flags = initial_metadata_flags;
cpa->connected_subchannel = connected_subchannel;
cpa->on_ready = on_ready;
cpa->elem = elem;
diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c
index 72c1cf250c..a7ad9842dc 100644
--- a/src/core/ext/client_config/lb_policy.c
+++ b/src/core/ext/client_config/lb_policy.c
@@ -101,10 +101,11 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
return policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata,
- target, on_complete);
+ initial_metadata_flags, target, on_complete);
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
@@ -112,6 +113,14 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
policy->vtable->cancel_pick(exec_ctx, policy, target);
}
+void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq) {
+ policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask,
+ initial_metadata_flags_eq);
+}
+
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
policy->vtable->exit_idle(exec_ctx, policy);
}
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index 44b715c7a1..0384e0b2eb 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -60,9 +60,13 @@ struct grpc_lb_policy_vtable {
/** implement grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_closure *on_complete);
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
+ void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq);
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
@@ -122,6 +126,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete);
@@ -131,6 +136,14 @@ void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
+/** Cancel all pending picks which have:
+ (initial_metadata_flags & initial_metadata_flags_mask) ==
+ initial_metadata_flags_eq */
+void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq);
+
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 5285041dd5..e6e3885e40 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -54,7 +54,7 @@
#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
#define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
-#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
+#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 2
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index cac62b81ea..5926f9d70b 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -40,6 +40,7 @@
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
+ uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@@ -149,6 +150,31 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
}
+static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ pending_pick *pp;
+ gpr_mu_lock(&p->mu);
+ pp = p->pending_picks;
+ p->pending_picks = NULL;
+ while (pp != NULL) {
+ pending_pick *next = pp->next;
+ if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
+ initial_metadata_flags_eq) {
+ grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
+ pp->pollset);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ gpr_free(pp);
+ } else {
+ pp->next = p->pending_picks;
+ p->pending_picks = pp;
+ }
+ pp = next;
+ }
+ gpr_mu_unlock(&p->mu);
+}
+
static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
p->started_picking = 1;
p->checking_subchannel = 0;
@@ -171,6 +197,7 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@@ -199,6 +226,7 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->next = p->pending_picks;
pp->pollset = pollset;
pp->target = target;
+ pp->initial_metadata_flags = initial_metadata_flags;
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
@@ -286,11 +314,14 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
&p->checking_connectivity, &p->connectivity_changed);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connecting_transient_failure");
p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels;
+ if (p->checking_subchannel == 0) {
+ /* only trigger transient failure when we've tried all alternatives */
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ "connecting_transient_failure");
+ }
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
@@ -378,14 +409,9 @@ static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
- pf_destroy,
- pf_shutdown,
- pf_pick,
- pf_cancel_pick,
- pf_ping_one,
- pf_exit_idle,
- pf_check_connectivity,
- pf_notify_on_state_change};
+ pf_destroy, pf_shutdown, pf_pick,
+ pf_cancel_pick, pf_cancel_picks, pf_ping_one,
+ pf_exit_idle, pf_check_connectivity, pf_notify_on_state_change};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 16f3942d3a..3f6051b892 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -49,6 +49,7 @@ int grpc_lb_round_robin_trace = 0;
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
+ uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@@ -275,6 +276,32 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
}
+static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq) {
+ round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
+ pending_pick *pp;
+ gpr_mu_lock(&p->mu);
+ pp = p->pending_picks;
+ p->pending_picks = NULL;
+ while (pp != NULL) {
+ pending_pick *next = pp->next;
+ if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
+ initial_metadata_flags_eq) {
+ grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
+ pp->pollset);
+ *pp->target = NULL;
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ gpr_free(pp);
+ } else {
+ pp->next = p->pending_picks;
+ p->pending_picks = pp;
+ }
+ pp = next;
+ }
+ gpr_mu_unlock(&p->mu);
+}
+
static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
size_t i;
p->started_picking = 1;
@@ -303,6 +330,7 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
@@ -330,6 +358,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->pollset = pollset;
pp->target = target;
pp->on_complete = on_complete;
+ pp->initial_metadata_flags = initial_metadata_flags;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
return 0;
@@ -485,14 +514,9 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
- rr_destroy,
- rr_shutdown,
- rr_pick,
- rr_cancel_pick,
- rr_ping_one,
- rr_exit_idle,
- rr_check_connectivity,
- rr_notify_on_state_change};
+ rr_destroy, rr_shutdown, rr_pick,
+ rr_cancel_pick, rr_cancel_picks, rr_ping_one,
+ rr_exit_idle, rr_check_connectivity, rr_notify_on_state_change};
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index deba23e21f..211f537c69 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -111,10 +111,12 @@ static void hc_mutate_op(grpc_call_element *elem,
elem);
/* Send : prefixed headers, which have to be before any application
layer headers. */
- grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->method,
- op->send_idempotent_request
- ? GRPC_MDELEM_METHOD_PUT
- : GRPC_MDELEM_METHOD_POST);
+ grpc_metadata_batch_add_head(
+ op->send_initial_metadata, &calld->method,
+ op->send_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
+ ? GRPC_MDELEM_METHOD_PUT
+ : GRPC_MDELEM_METHOD_POST);
grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->scheme,
channeld->static_scheme);
grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->te_trailers,
diff --git a/src/core/lib/channel/subchannel_call_holder.c b/src/core/lib/channel/subchannel_call_holder.c
index 22f3679bf5..a2686a380a 100644
--- a/src/core/lib/channel/subchannel_call_holder.c
+++ b/src/core/lib/channel/subchannel_call_holder.c
@@ -127,7 +127,7 @@ retry:
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
- &holder->connected_subchannel, NULL);
+ 0, &holder->connected_subchannel, NULL);
break;
}
gpr_mu_unlock(&holder->mu);
@@ -145,7 +145,8 @@ retry:
GRPC_CALL_STACK_REF(holder->owning_call, "pick_subchannel");
if (holder->pick_subchannel(
exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata,
- &holder->connected_subchannel, &holder->next_step)) {
+ op->send_initial_metadata_flags, &holder->connected_subchannel,
+ &holder->next_step)) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel");
}
diff --git a/src/core/lib/channel/subchannel_call_holder.h b/src/core/lib/channel/subchannel_call_holder.h
index 8e3a99e40b..2107a06cd9 100644
--- a/src/core/lib/channel/subchannel_call_holder.h
+++ b/src/core/lib/channel/subchannel_call_holder.h
@@ -42,6 +42,7 @@
called when the subchannel is available) */
typedef int (*grpc_subchannel_call_holder_pick_subchannel)(
grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready);
typedef enum {
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 37cc724b53..ba94852f9a 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -81,11 +81,11 @@ typedef enum {
/* Status came from the application layer overriding whatever
the wire says */
STATUS_FROM_API_OVERRIDE = 0,
- /* Status was created by some internal channel stack operation */
- STATUS_FROM_CORE,
/* Status came from 'the wire' - or somewhere below the surface
layer */
STATUS_FROM_WIRE,
+ /* Status was created by some internal channel stack operation */
+ STATUS_FROM_CORE,
/* Status came from the server sending status */
STATUS_FROM_SERVER_STATUS,
STATUS_SOURCE_COUNT
@@ -1110,6 +1110,9 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
gpr_mu_lock(&call->mu);
if (bctl->send_initial_metadata) {
+ if (!success) {
+ set_status_code(call, STATUS_FROM_CORE, GRPC_STATUS_UNAVAILABLE);
+ }
grpc_metadata_batch_destroy(
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
@@ -1232,8 +1235,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->metadata_batch[0][0].deadline = call->send_deadline;
stream_op.send_initial_metadata =
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
- stream_op.send_idempotent_request =
- (op->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) != 0;
+ stream_op.send_initial_metadata_flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 460e4dcedc..bb1120ee3a 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -101,9 +101,9 @@ typedef struct grpc_transport_stream_op {
/** Send initial metadata to the peer, from the provided metadata batch.
idempotent_request MUST be set if this is non-null */
grpc_metadata_batch *send_initial_metadata;
- /** Iff send_initial_metadata != NULL, flags if this is an idempotent request
- or not */
- bool send_idempotent_request;
+ /** Iff send_initial_metadata != NULL, flags associated with
+ send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */
+ uint32_t send_initial_metadata_flags;
/** Send trailing metadata to the peer, from the provided metadata batch. */
grpc_metadata_batch *send_trailing_metadata;
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index bf6423128a..c277d7ebe8 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -60,6 +60,8 @@ static ClientContext::GlobalCallbacks* g_client_callbacks =
ClientContext::ClientContext()
: initial_metadata_received_(false),
+ fail_fast_(true),
+ idempotent_(false),
call_(nullptr),
call_canceled_(false),
deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),
diff --git a/test/core/bad_ssl/bad_ssl_test.c b/test/core/bad_ssl/bad_ssl_test.c
index 12c7710af6..c4ae212ec2 100644
--- a/test/core/bad_ssl/bad_ssl_test.c
+++ b/test/core/bad_ssl/bad_ssl_test.c
@@ -87,7 +87,7 @@ static void run_test(const char *target, size_t nops) {
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
- op->flags = 0;
+ op->flags = GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c
index 8568bb1a86..81f76ea79c 100644
--- a/test/core/end2end/dualstack_socket_test.c
+++ b/test/core/end2end/dualstack_socket_test.c
@@ -168,7 +168,7 @@ void test_connect(const char *server_host, const char *client_host, int port,
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
- op->flags = 0;
+ op->flags = expect_ok ? GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY : 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
@@ -237,7 +237,7 @@ void test_connect(const char *server_host, const char *client_host, int port,
cq_expect_completion(cqv, tag(1), 1);
cq_verify(cqv);
- GPR_ASSERT(status == GRPC_STATUS_DEADLINE_EXCEEDED);
+ GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
}
grpc_call_destroy(c);
diff --git a/test/core/end2end/tests/simple_delayed_request.c b/test/core/end2end/tests/simple_delayed_request.c
index bb9d462cef..122dcd0a5b 100644
--- a/test/core/end2end/tests/simple_delayed_request.c
+++ b/test/core/end2end/tests/simple_delayed_request.c
@@ -120,7 +120,7 @@ static void simple_delayed_request_body(grpc_end2end_test_config config,
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
- op->flags = 0;
+ op->flags = GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
diff --git a/test/cpp/end2end/client_crash_test.cc b/test/cpp/end2end/client_crash_test.cc
index c01f40dbc2..30b04fb9b9 100644
--- a/test/cpp/end2end/client_crash_test.cc
+++ b/test/cpp/end2end/client_crash_test.cc
@@ -88,6 +88,7 @@ TEST_F(CrashTest, KillBeforeWrite) {
EchoRequest request;
EchoResponse response;
ClientContext context;
+ context.set_fail_fast(false);
auto stream = stub->BidiStream(&context);
@@ -113,6 +114,7 @@ TEST_F(CrashTest, KillAfterWrite) {
EchoRequest request;
EchoResponse response;
ClientContext context;
+ context.set_fail_fast(false);
auto stream = stub->BidiStream(&context);
diff --git a/test/cpp/end2end/server_crash_test_client.cc b/test/cpp/end2end/server_crash_test_client.cc
index 1964840fa5..ece948d5a7 100644
--- a/test/cpp/end2end/server_crash_test_client.cc
+++ b/test/cpp/end2end/server_crash_test_client.cc
@@ -63,6 +63,7 @@ int main(int argc, char** argv) {
EchoRequest request;
EchoResponse response;
grpc::ClientContext context;
+ context.set_fail_fast(false);
if (FLAGS_mode == "bidi") {
auto stream = stub->BidiStream(&context);