aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/census/gen/README.md2
-rw-r--r--src/core/ext/census/gen/trace_context.pb.h2
-rw-r--r--src/core/ext/census/grpc_filter.c20
-rw-r--r--src/core/ext/census/grpc_plugin.c2
-rw-r--r--src/core/ext/census/trace_label.h2
-rw-r--r--src/core/ext/census/trace_propagation.h2
-rw-r--r--src/core/ext/census/trace_status.h2
-rw-r--r--src/core/ext/census/trace_string.h2
-rw-r--r--src/core/ext/census/tracing.h2
-rw-r--r--src/core/ext/filters/client_channel/README.md (renamed from src/core/ext/client_channel/README.md)0
-rw-r--r--src/core/ext/filters/client_channel/channel_connectivity.c (renamed from src/core/ext/client_channel/channel_connectivity.c)2
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c (renamed from src/core/ext/client_channel/client_channel.c)147
-rw-r--r--src/core/ext/filters/client_channel/client_channel.h (renamed from src/core/ext/client_channel/client_channel.h)10
-rw-r--r--src/core/ext/filters/client_channel/client_channel_factory.c (renamed from src/core/ext/client_channel/client_channel_factory.c)2
-rw-r--r--src/core/ext/filters/client_channel/client_channel_factory.h (renamed from src/core/ext/client_channel/client_channel_factory.h)8
-rw-r--r--src/core/ext/filters/client_channel/client_channel_plugin.c (renamed from src/core/ext/client_channel/client_channel_plugin.c)16
-rw-r--r--src/core/ext/filters/client_channel/connector.c (renamed from src/core/ext/client_channel/connector.c)2
-rw-r--r--src/core/ext/filters/client_channel/connector.h (renamed from src/core/ext/client_channel/connector.h)6
-rw-r--r--src/core/ext/filters/client_channel/http_connect_handshaker.c (renamed from src/core/ext/client_channel/http_connect_handshaker.c)8
-rw-r--r--src/core/ext/filters/client_channel/http_connect_handshaker.h (renamed from src/core/ext/client_channel/http_connect_handshaker.h)6
-rw-r--r--src/core/ext/filters/client_channel/http_proxy.c (renamed from src/core/ext/client_channel/http_proxy.c)8
-rw-r--r--src/core/ext/filters/client_channel/http_proxy.h (renamed from src/core/ext/client_channel/http_proxy.h)6
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.c (renamed from src/core/ext/client_channel/lb_policy.c)2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h (renamed from src/core/ext/client_channel/lb_policy.h)8
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c (renamed from src/core/ext/lb_policy/grpclb/grpclb.c)20
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h (renamed from src/core/ext/lb_policy/grpclb/grpclb.h)8
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c (renamed from src/core/ext/lb_policy/grpclb/grpclb_channel.c)4
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h (renamed from src/core/ext/lb_policy/grpclb/grpclb_channel.h)9
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c (renamed from src/core/ext/lb_policy/grpclb/grpclb_channel_secure.c)4
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c (renamed from src/core/ext/lb_policy/grpclb/load_balancer_api.c)2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h (renamed from src/core/ext/lb_policy/grpclb/load_balancer_api.h)11
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c (renamed from src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c)2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h (renamed from src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h)0
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c (renamed from src/core/ext/lb_policy/pick_first/pick_first.c)8
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c (renamed from src/core/ext/lb_policy/round_robin/round_robin.c)8
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_factory.c (renamed from src/core/ext/client_channel/lb_policy_factory.c)2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_factory.h (renamed from src/core/ext/client_channel/lb_policy_factory.h)10
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_registry.c (renamed from src/core/ext/client_channel/lb_policy_registry.c)2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_registry.h (renamed from src/core/ext/client_channel/lb_policy_registry.h)8
-rw-r--r--src/core/ext/filters/client_channel/parse_address.c (renamed from src/core/ext/client_channel/parse_address.c)2
-rw-r--r--src/core/ext/filters/client_channel/parse_address.h (renamed from src/core/ext/client_channel/parse_address.h)8
-rw-r--r--src/core/ext/filters/client_channel/proxy_mapper.c (renamed from src/core/ext/client_channel/proxy_mapper.c)2
-rw-r--r--src/core/ext/filters/client_channel/proxy_mapper.h (renamed from src/core/ext/client_channel/proxy_mapper.h)6
-rw-r--r--src/core/ext/filters/client_channel/proxy_mapper_registry.c (renamed from src/core/ext/client_channel/proxy_mapper_registry.c)2
-rw-r--r--src/core/ext/filters/client_channel/proxy_mapper_registry.h (renamed from src/core/ext/client_channel/proxy_mapper_registry.h)8
-rw-r--r--src/core/ext/filters/client_channel/resolver.c (renamed from src/core/ext/client_channel/resolver.c)2
-rw-r--r--src/core/ext/filters/client_channel/resolver.h (renamed from src/core/ext/client_channel/resolver.h)8
-rw-r--r--src/core/ext/filters/client_channel/resolver/README.md (renamed from src/core/ext/resolver/README.md)0
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c350
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h66
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c319
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c289
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h64
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/native/README.md (renamed from src/core/ext/resolver/dns/native/README.md)0
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c (renamed from src/core/ext/resolver/dns/native/dns_resolver.c)21
-rw-r--r--src/core/ext/filters/client_channel/resolver/sockaddr/README.md (renamed from src/core/ext/resolver/sockaddr/README.md)0
-rw-r--r--src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c (renamed from src/core/ext/resolver/sockaddr/sockaddr_resolver.c)6
-rw-r--r--src/core/ext/filters/client_channel/resolver_factory.c (renamed from src/core/ext/client_channel/resolver_factory.c)2
-rw-r--r--src/core/ext/filters/client_channel/resolver_factory.h (renamed from src/core/ext/client_channel/resolver_factory.h)12
-rw-r--r--src/core/ext/filters/client_channel/resolver_registry.c (renamed from src/core/ext/client_channel/resolver_registry.c)3
-rw-r--r--src/core/ext/filters/client_channel/resolver_registry.h (renamed from src/core/ext/client_channel/resolver_registry.h)8
-rw-r--r--src/core/ext/filters/client_channel/retry_throttle.c (renamed from src/core/ext/client_channel/retry_throttle.c)2
-rw-r--r--src/core/ext/filters/client_channel/retry_throttle.h (renamed from src/core/ext/client_channel/retry_throttle.h)6
-rw-r--r--src/core/ext/filters/client_channel/subchannel.c (renamed from src/core/ext/client_channel/subchannel.c)17
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h (renamed from src/core/ext/client_channel/subchannel.h)10
-rw-r--r--src/core/ext/filters/client_channel/subchannel_index.c (renamed from src/core/ext/client_channel/subchannel_index.c)2
-rw-r--r--src/core/ext/filters/client_channel/subchannel_index.h (renamed from src/core/ext/client_channel/subchannel_index.h)10
-rw-r--r--src/core/ext/filters/client_channel/uri_parser.c (renamed from src/core/ext/client_channel/uri_parser.c)2
-rw-r--r--src/core/ext/filters/client_channel/uri_parser.h (renamed from src/core/ext/client_channel/uri_parser.h)6
-rw-r--r--src/core/ext/filters/load_reporting/load_reporting.c (renamed from src/core/ext/load_reporting/load_reporting.c)4
-rw-r--r--src/core/ext/filters/load_reporting/load_reporting.h (renamed from src/core/ext/load_reporting/load_reporting.h)6
-rw-r--r--src/core/ext/filters/load_reporting/load_reporting_filter.c (renamed from src/core/ext/load_reporting/load_reporting_filter.c)25
-rw-r--r--src/core/ext/filters/load_reporting/load_reporting_filter.h (renamed from src/core/ext/load_reporting/load_reporting_filter.h)8
-rw-r--r--src/core/ext/filters/max_age/max_age_filter.c419
-rw-r--r--src/core/ext/filters/max_age/max_age_filter.h39
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.c6
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.h2
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c4
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c6
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c174
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_ping.c20
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_ping.h3
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h8
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c8
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c100
85 files changed, 2064 insertions, 366 deletions
diff --git a/src/core/ext/census/gen/README.md b/src/core/ext/census/gen/README.md
index fdbac1084c..d4612bc7c8 100644
--- a/src/core/ext/census/gen/README.md
+++ b/src/core/ext/census/gen/README.md
@@ -1,6 +1,6 @@
Files generated for use by Census stats and trace recording subsystem.
-#Files
+# Files
* census.pb.{h,c} - Generated from src/core/ext/census/census.proto, using the
script `tools/codegen/core/gen_nano_proto.sh src/proto/census/census.proto
$PWD/src/core/ext/census/gen src/core/ext/census/gen`
diff --git a/src/core/ext/census/gen/trace_context.pb.h b/src/core/ext/census/gen/trace_context.pb.h
index ea127bf70a..6fafc04c81 100644
--- a/src/core/ext/census/gen/trace_context.pb.h
+++ b/src/core/ext/census/gen/trace_context.pb.h
@@ -90,4 +90,4 @@ extern const pb_field_t google_trace_TraceContext_fields[5];
#endif
/* @@protoc_insertion_point(eof) */
-#endif
+#endif /* GRPC_CORE_EXT_CENSUS_GEN_TRACE_CONTEXT_PB_H */
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index fc29dbd454..bcf59a4efe 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -74,17 +74,18 @@ static void extract_and_annotate_method_tag(grpc_metadata_batch *md,
}
static void client_mutate_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
if (op->send_initial_metadata) {
- extract_and_annotate_method_tag(op->send_initial_metadata, calld, chand);
+ extract_and_annotate_method_tag(
+ op->payload->send_initial_metadata.send_initial_metadata, calld, chand);
}
}
static void client_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
client_mutate_op(elem, op);
grpc_call_next_op(exec_ctx, elem, op);
}
@@ -103,19 +104,22 @@ static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr,
}
static void server_mutate_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
if (op->recv_initial_metadata) {
/* substitute our callback for the op callback */
- calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->on_done_recv = op->recv_initial_metadata_ready;
- op->recv_initial_metadata_ready = &calld->finish_recv;
+ calld->recv_initial_metadata =
+ op->payload->recv_initial_metadata.recv_initial_metadata;
+ calld->on_done_recv =
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready;
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready =
+ &calld->finish_recv;
}
}
static void server_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
/* TODO(ctiller): this code fails. I don't know why. I expect it's
incomplete, and someone should look at it soon.
diff --git a/src/core/ext/census/grpc_plugin.c b/src/core/ext/census/grpc_plugin.c
index c9fe453af8..28d266e22a 100644
--- a/src/core/ext/census/grpc_plugin.c
+++ b/src/core/ext/census/grpc_plugin.c
@@ -48,7 +48,7 @@ static bool is_census_enabled(const grpc_channel_args *a) {
return a->args[i].value.integer != 0 && census_enabled();
}
}
- return census_enabled();
+ return census_enabled() && !grpc_channel_args_want_minimal_stack(a);
}
static bool maybe_add_census_filter(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/census/trace_label.h b/src/core/ext/census/trace_label.h
index 0e4a8d885f..701f6f5a63 100644
--- a/src/core/ext/census/trace_label.h
+++ b/src/core/ext/census/trace_label.h
@@ -58,4 +58,4 @@ typedef struct trace_label {
} value;
} trace_label;
-#endif
+#endif /* GRPC_CORE_EXT_CENSUS_TRACE_LABEL_H */
diff --git a/src/core/ext/census/trace_propagation.h b/src/core/ext/census/trace_propagation.h
index 75c4ebaa39..0c27bfa111 100644
--- a/src/core/ext/census/trace_propagation.h
+++ b/src/core/ext/census/trace_propagation.h
@@ -60,4 +60,4 @@ size_t trace_span_context_to_http_format(const trace_span_context *ctxt,
size_t http_format_to_trace_span_context(const char *buf, size_t buf_size,
trace_span_context *ctxt);
-#endif
+#endif /* GRPC_CORE_EXT_CENSUS_TRACE_PROPAGATION_H */
diff --git a/src/core/ext/census/trace_status.h b/src/core/ext/census/trace_status.h
index adc0ebd0c0..0734ec6d3d 100644
--- a/src/core/ext/census/trace_status.h
+++ b/src/core/ext/census/trace_status.h
@@ -42,4 +42,4 @@ typedef struct trace_status {
trace_string errorMessage;
} trace_status;
-#endif
+#endif /* GRPC_CORE_EXT_CENSUS_TRACE_STATUS_H */
diff --git a/src/core/ext/census/trace_string.h b/src/core/ext/census/trace_string.h
index 8e77ee9f7e..cd63cfb92f 100644
--- a/src/core/ext/census/trace_string.h
+++ b/src/core/ext/census/trace_string.h
@@ -47,4 +47,4 @@ typedef struct trace_string {
size_t length;
} trace_string;
-#endif
+#endif /* GRPC_CORE_EXT_CENSUS_TRACE_STRING_H */
diff --git a/src/core/ext/census/tracing.h b/src/core/ext/census/tracing.h
index c2b947ae40..3a7c6f45e4 100644
--- a/src/core/ext/census/tracing.h
+++ b/src/core/ext/census/tracing.h
@@ -121,4 +121,4 @@ free to ignore all further calls using the Span. EndSpanOptions can
optionally be NULL. */
void trace_end_span(const trace_status *status, trace_span_context *span_ctxt);
-#endif
+#endif /* GRPC_CORE_EXT_CENSUS_TRACING_H */
diff --git a/src/core/ext/client_channel/README.md b/src/core/ext/filters/client_channel/README.md
index 7c209db12e..7c209db12e 100644
--- a/src/core/ext/client_channel/README.md
+++ b/src/core/ext/filters/client_channel/README.md
diff --git a/src/core/ext/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c
index f6cb3b9115..62f58fb278 100644
--- a/src/core/ext/client_channel/channel_connectivity.c
+++ b/src/core/ext/filters/client_channel/channel_connectivity.c
@@ -36,7 +36,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include "src/core/ext/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/completion_queue.h"
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index 435a3ab0fe..93ad53aab9 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/client_channel.h"
#include <stdbool.h>
#include <stdio.h>
@@ -43,12 +43,12 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
-#include "src/core/ext/client_channel/http_connect_handshaker.h"
-#include "src/core/ext/client_channel/lb_policy_registry.h"
-#include "src/core/ext/client_channel/proxy_mapper_registry.h"
-#include "src/core/ext/client_channel/resolver_registry.h"
-#include "src/core/ext/client_channel/retry_throttle.h"
-#include "src/core/ext/client_channel/subchannel.h"
+#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
+#include "src/core/ext/filters/client_channel/retry_throttle.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/channel/deadline_filter.h"
@@ -374,8 +374,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
// resolver actually specified.
channel_arg =
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
- if (channel_arg != NULL) {
- GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
+ if (channel_arg != NULL && channel_arg->type == GRPC_ARG_POINTER) {
grpc_lb_addresses *addresses = channel_arg->value.pointer.p;
bool found_backend_address = false;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
@@ -533,7 +532,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error_ignored) {
grpc_transport_op *op = arg;
- grpc_channel_element *elem = op->transport_private.args[0];
+ grpc_channel_element *elem = op->handler_private.extra_arg;
channel_data *chand = elem->channel_data;
if (op->on_connectivity_state_change != NULL) {
@@ -595,12 +594,12 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
op->bind_pollset);
}
- op->transport_private.args[0] = elem;
+ op->handler_private.extra_arg = elem;
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
grpc_closure_sched(
- exec_ctx, grpc_closure_init(
- &op->transport_private.closure, start_transport_op_locked,
- op, grpc_combiner_scheduler(chand->combiner, false)),
+ exec_ctx,
+ grpc_closure_init(&op->handler_private.closure, start_transport_op_locked,
+ op, grpc_combiner_scheduler(chand->combiner, false)),
GRPC_ERROR_NONE);
}
@@ -643,14 +642,26 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
// Record client channel factory.
const grpc_arg *arg = grpc_channel_args_find(args->channel_args,
GRPC_ARG_CLIENT_CHANNEL_FACTORY);
- GPR_ASSERT(arg != NULL);
- GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
+ if (arg == NULL) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Missing client channel factory in args for client channel filter");
+ }
+ if (arg->type != GRPC_ARG_POINTER) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "client channel factory arg must be a pointer");
+ }
grpc_client_channel_factory_ref(arg->value.pointer.p);
chand->client_channel_factory = arg->value.pointer.p;
// Get server name to resolve, using proxy mapper if needed.
arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
- GPR_ASSERT(arg != NULL);
- GPR_ASSERT(arg->type == GRPC_ARG_STRING);
+ if (arg == NULL) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Missing server uri in args for client channel filter");
+ }
+ if (arg->type != GRPC_ARG_STRING) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "server uri arg must be a string");
+ }
char *proxy_name = NULL;
grpc_channel_args *new_args = NULL;
grpc_proxy_mappers_map_name(exec_ctx, arg->value.string, args->channel_args,
@@ -755,7 +766,7 @@ typedef struct client_channel_call_data {
grpc_connected_subchannel *connected_subchannel;
grpc_polling_entity *pollent;
- grpc_transport_stream_op **waiting_ops;
+ grpc_transport_stream_op_batch **waiting_ops;
size_t waiting_ops_count;
size_t waiting_ops_capacity;
@@ -775,7 +786,8 @@ grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
return scc == CANCELLED_CALL ? NULL : scc;
}
-static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
+static void add_waiting_locked(call_data *calld,
+ grpc_transport_stream_op_batch *op) {
GPR_TIMER_BEGIN("add_waiting_locked", 0);
if (calld->waiting_ops_count == calld->waiting_ops_capacity) {
calld->waiting_ops_capacity = GPR_MAX(3, 2 * calld->waiting_ops_capacity);
@@ -791,7 +803,7 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld,
grpc_error *error) {
size_t i;
for (i = 0; i < calld->waiting_ops_count; i++) {
- grpc_transport_stream_op_finish_with_failure(
+ grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error));
}
calld->waiting_ops_count = 0;
@@ -804,7 +816,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
}
grpc_subchannel_call *call = GET_CALL(calld);
- grpc_transport_stream_op **ops = calld->waiting_ops;
+ grpc_transport_stream_op_batch **ops = calld->waiting_ops;
size_t nops = calld->waiting_ops_count;
if (call == CANCELLED_CALL) {
fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED);
@@ -1052,9 +1064,9 @@ static bool pick_subchannel_locked(
return false;
}
-static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
- grpc_transport_stream_op *op,
- grpc_call_element *elem) {
+static void start_transport_stream_op_batch_locked_inner(
+ grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op,
+ grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
grpc_subchannel_call *call;
@@ -1062,7 +1074,7 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
/* need to recheck that another thread hasn't set the call */
call = GET_CALL(calld);
if (call == CANCELLED_CALL) {
- grpc_transport_stream_op_finish_with_failure(
+ grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
/* early out */
return;
@@ -1073,11 +1085,11 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
return;
}
/* if this is a cancellation, then we can raise our cancelled flag */
- if (op->cancel_error != GRPC_ERROR_NONE) {
+ if (op->cancel_stream) {
if (!gpr_atm_rel_cas(&calld->subchannel_call, 0,
(gpr_atm)(uintptr_t)CANCELLED_CALL)) {
/* recurse to retry */
- start_transport_stream_op_locked_inner(exec_ctx, op, elem);
+ start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
/* early out */
return;
} else {
@@ -1086,27 +1098,29 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
cancelled before any ops are passed down (e.g., if the deadline
is in the past when the call starts), we can return the right
error to the caller when the first op does get passed down. */
- calld->cancel_error = GRPC_ERROR_REF(op->cancel_error);
+ calld->cancel_error =
+ GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error);
switch (calld->creation_phase) {
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
- fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error));
+ fail_locked(exec_ctx, calld,
+ GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
- pick_subchannel_locked(exec_ctx, elem, NULL, 0,
- &calld->connected_subchannel, NULL,
- GRPC_ERROR_REF(op->cancel_error));
+ pick_subchannel_locked(
+ exec_ctx, elem, NULL, 0, &calld->connected_subchannel, NULL,
+ GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
break;
}
- grpc_transport_stream_op_finish_with_failure(
- exec_ctx, op, GRPC_ERROR_REF(op->cancel_error));
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, op,
+ GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
/* early out */
return;
}
}
/* if we don't have a subchannel, try to get one */
if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
- calld->connected_subchannel == NULL &&
- op->send_initial_metadata != NULL) {
+ calld->connected_subchannel == NULL && op->send_initial_metadata) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem,
grpc_combiner_scheduler(chand->combiner, true));
@@ -1114,10 +1128,11 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
/* If a subchannel is not available immediately, the polling entity from
call_data should be provided to channel_data's interested_parties, so
that IO of the lb_policy and resolver could be done under it. */
- if (pick_subchannel_locked(exec_ctx, elem, op->send_initial_metadata,
- op->send_initial_metadata_flags,
- &calld->connected_subchannel, &calld->next_step,
- GRPC_ERROR_NONE)) {
+ if (pick_subchannel_locked(
+ exec_ctx, elem,
+ op->payload->send_initial_metadata.send_initial_metadata,
+ op->payload->send_initial_metadata.send_initial_metadata_flags,
+ &calld->connected_subchannel, &calld->next_step, GRPC_ERROR_NONE)) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
} else {
@@ -1140,13 +1155,13 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
if (error != GRPC_ERROR_NONE) {
subchannel_call = CANCELLED_CALL;
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
}
gpr_atm_rel_store(&calld->subchannel_call,
(gpr_atm)(uintptr_t)subchannel_call);
retry_waiting_locked(exec_ctx, calld);
/* recurse to retry */
- start_transport_stream_op_locked_inner(exec_ctx, op, elem);
+ start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
/* early out */
return;
}
@@ -1174,15 +1189,16 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
GRPC_ERROR_REF(error));
}
-static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error_ignored) {
- GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0);
+static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx,
+ void *arg,
+ grpc_error *error_ignored) {
+ GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0);
- grpc_transport_stream_op *op = arg;
- grpc_call_element *elem = op->handler_private.args[0];
+ grpc_transport_stream_op_batch *op = arg;
+ grpc_call_element *elem = op->handler_private.extra_arg;
call_data *calld = elem->call_data;
- if (op->recv_trailing_metadata != NULL) {
+ if (op->recv_trailing_metadata) {
GPR_ASSERT(op->on_complete != NULL);
calld->original_on_complete = op->on_complete;
grpc_closure_init(&calld->on_complete, on_complete, elem,
@@ -1190,11 +1206,11 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
op->on_complete = &calld->on_complete;
}
- start_transport_stream_op_locked_inner(exec_ctx, op, elem);
+ start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
- "start_transport_stream_op");
- GPR_TIMER_END("start_transport_stream_op_locked", 0);
+ "start_transport_stream_op_batch");
+ GPR_TIMER_END("start_transport_stream_op_batch_locked", 0);
}
/* The logic here is fairly complicated, due to (a) the fact that we
@@ -1205,39 +1221,40 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
We use double-checked locking to initially see if initialization has been
performed. If it has not, we acquire the combiner and perform initialization.
If it has, we proceed on the fast path. */
-static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+static void cc_start_transport_stream_op_batch(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
+ grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
+ op);
/* try to (atomically) get the call */
grpc_subchannel_call *call = GET_CALL(calld);
- GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0);
+ GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
if (call == CANCELLED_CALL) {
- grpc_transport_stream_op_finish_with_failure(
+ grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
- GPR_TIMER_END("cc_start_transport_stream_op", 0);
+ GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
/* early out */
return;
}
if (call != NULL) {
grpc_subchannel_call_process_op(exec_ctx, call, op);
- GPR_TIMER_END("cc_start_transport_stream_op", 0);
+ GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
/* early out */
return;
}
/* we failed; lock and figure out what to do */
- GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op");
- op->handler_private.args[0] = elem;
+ GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch");
+ op->handler_private.extra_arg = elem;
grpc_closure_sched(
exec_ctx,
grpc_closure_init(&op->handler_private.closure,
- start_transport_stream_op_locked, op,
+ start_transport_stream_op_batch_locked, op,
grpc_combiner_scheduler(chand->combiner, false)),
GRPC_ERROR_NONE);
- GPR_TIMER_END("cc_start_transport_stream_op", 0);
+ GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
}
/* Constructor for call_data */
@@ -1296,7 +1313,7 @@ static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
*/
const grpc_channel_filter grpc_client_channel_filter = {
- cc_start_transport_stream_op,
+ cc_start_transport_stream_op_batch,
cc_start_transport_op,
sizeof(call_data),
cc_init_call_elem,
diff --git a/src/core/ext/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h
index 5e6e64e58b..8d2490ea55 100644
--- a/src/core/ext/client_channel/client_channel.h
+++ b/src/core/ext/filters/client_channel/client_channel.h
@@ -31,11 +31,11 @@
*
*/
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H
-#include "src/core/ext/client_channel/client_channel_factory.h"
-#include "src/core/ext/client_channel/resolver.h"
+#include "src/core/ext/filters/client_channel/client_channel_factory.h"
+#include "src/core/ext/filters/client_channel/resolver.h"
#include "src/core/lib/channel/channel_stack.h"
// Channel arg key for server URI string.
@@ -61,4 +61,4 @@ void grpc_client_channel_watch_connectivity_state(
grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
grpc_call_element *elem);
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H */
diff --git a/src/core/ext/client_channel/client_channel_factory.c b/src/core/ext/filters/client_channel/client_channel_factory.c
index d2707a1556..44e83b54b5 100644
--- a/src/core/ext/client_channel/client_channel_factory.c
+++ b/src/core/ext/filters/client_channel/client_channel_factory.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/client_channel/client_channel_factory.h"
+#include "src/core/ext/filters/client_channel/client_channel_factory.h"
void grpc_client_channel_factory_ref(grpc_client_channel_factory* factory) {
factory->vtable->ref(factory);
diff --git a/src/core/ext/client_channel/client_channel_factory.h b/src/core/ext/filters/client_channel/client_channel_factory.h
index bf2764b537..2355588f18 100644
--- a/src/core/ext/client_channel/client_channel_factory.h
+++ b/src/core/ext/filters/client_channel/client_channel_factory.h
@@ -31,12 +31,12 @@
*
*/
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_FACTORY_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_FACTORY_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_FACTORY_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_FACTORY_H
#include <grpc/impl/codegen/grpc_types.h>
-#include "src/core/ext/client_channel/subchannel.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_stack.h"
// Channel arg key for client channel factory.
@@ -89,4 +89,4 @@ grpc_channel *grpc_client_channel_factory_create_channel(
grpc_arg grpc_client_channel_factory_create_channel_arg(
grpc_client_channel_factory *factory);
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_FACTORY_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_FACTORY_H */
diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/filters/client_channel/client_channel_plugin.c
index f51277d0b2..944af01af4 100644
--- a/src/core/ext/client_channel/client_channel_plugin.c
+++ b/src/core/ext/filters/client_channel/client_channel_plugin.c
@@ -37,14 +37,14 @@
#include <grpc/support/alloc.h>
-#include "src/core/ext/client_channel/client_channel.h"
-#include "src/core/ext/client_channel/http_connect_handshaker.h"
-#include "src/core/ext/client_channel/http_proxy.h"
-#include "src/core/ext/client_channel/lb_policy_registry.h"
-#include "src/core/ext/client_channel/proxy_mapper_registry.h"
-#include "src/core/ext/client_channel/resolver_registry.h"
-#include "src/core/ext/client_channel/retry_throttle.h"
-#include "src/core/ext/client_channel/subchannel_index.h"
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
+#include "src/core/ext/filters/client_channel/http_proxy.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
+#include "src/core/ext/filters/client_channel/retry_throttle.h"
+#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/surface/channel_init.h"
static bool append_filter(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/client_channel/connector.c b/src/core/ext/filters/client_channel/connector.c
index 7a720fd1bd..51c1d7ece7 100644
--- a/src/core/ext/client_channel/connector.c
+++ b/src/core/ext/filters/client_channel/connector.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/client_channel/connector.h"
+#include "src/core/ext/filters/client_channel/connector.h"
grpc_connector* grpc_connector_ref(grpc_connector* connector) {
connector->vtable->ref(connector);
diff --git a/src/core/ext/client_channel/connector.h b/src/core/ext/filters/client_channel/connector.h
index 94b5fb5c9e..23040c3e23 100644
--- a/src/core/ext/client_channel/connector.h
+++ b/src/core/ext/filters/client_channel/connector.h
@@ -31,8 +31,8 @@
*
*/
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_CONNECTOR_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_CONNECTOR_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONNECTOR_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONNECTOR_H
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/resolve_address.h"
@@ -85,4 +85,4 @@ void grpc_connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
void grpc_connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
grpc_error *why);
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CONNECTOR_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONNECTOR_H */
diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/filters/client_channel/http_connect_handshaker.c
index 778d76c39f..c09a863d00 100644
--- a/src/core/ext/client_channel/http_connect_handshaker.c
+++ b/src/core/ext/filters/client_channel/http_connect_handshaker.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/client_channel/http_connect_handshaker.h"
+#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include <string.h>
@@ -40,9 +40,9 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/client_channel/client_channel.h"
-#include "src/core/ext/client_channel/resolver_registry.h"
-#include "src/core/ext/client_channel/uri_parser.h"
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
+#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker_registry.h"
#include "src/core/lib/http/format_request.h"
diff --git a/src/core/ext/client_channel/http_connect_handshaker.h b/src/core/ext/filters/client_channel/http_connect_handshaker.h
index 3059d551e3..6d7c204d71 100644
--- a/src/core/ext/client_channel/http_connect_handshaker.h
+++ b/src/core/ext/filters/client_channel/http_connect_handshaker.h
@@ -31,8 +31,8 @@
*
*/
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H
/// Channel arg indicating the server in HTTP CONNECT request (string).
/// The presence of this arg triggers the use of HTTP CONNECT.
@@ -46,4 +46,4 @@
/// Registers handshaker factory.
void grpc_http_connect_register_handshaker_factory();
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H */
diff --git a/src/core/ext/client_channel/http_proxy.c b/src/core/ext/filters/client_channel/http_proxy.c
index e280cef101..f8a2d06b8a 100644
--- a/src/core/ext/client_channel/http_proxy.c
+++ b/src/core/ext/filters/client_channel/http_proxy.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/client_channel/http_proxy.h"
+#include "src/core/ext/filters/client_channel/http_proxy.h"
#include <stdbool.h>
#include <string.h>
@@ -40,9 +40,9 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/client_channel/http_connect_handshaker.h"
-#include "src/core/ext/client_channel/proxy_mapper_registry.h"
-#include "src/core/ext/client_channel/uri_parser.h"
+#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
+#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
+#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/support/env.h"
diff --git a/src/core/ext/client_channel/http_proxy.h b/src/core/ext/filters/client_channel/http_proxy.h
index c8882b1ef1..f0df74bbda 100644
--- a/src/core/ext/client_channel/http_proxy.h
+++ b/src/core/ext/filters/client_channel/http_proxy.h
@@ -31,9 +31,9 @@
*
*/
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_PROXY_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_PROXY_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HTTP_PROXY_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HTTP_PROXY_H
void grpc_register_http_proxy_mapper();
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_PROXY_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HTTP_PROXY_H */
diff --git a/src/core/ext/client_channel/lb_policy.c b/src/core/ext/filters/client_channel/lb_policy.c
index aba51add53..2d31499d13 100644
--- a/src/core/ext/client_channel/lb_policy.c
+++ b/src/core/ext/filters/client_channel/lb_policy.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/client_channel/lb_policy.h"
+#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/lib/iomgr/combiner.h"
#define WEAK_REF_BITS 16
diff --git a/src/core/ext/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 3405709c2c..25427666ae 100644
--- a/src/core/ext/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -31,10 +31,10 @@
*
*/
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H
-#include "src/core/ext/client_channel/subchannel.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -206,4 +206,4 @@ grpc_connectivity_state grpc_lb_policy_check_connectivity_locked(
grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_error **connectivity_error);
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H */
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index 601b0e643b..ff8d319309 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -106,14 +106,14 @@
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
-#include "src/core/ext/client_channel/client_channel.h"
-#include "src/core/ext/client_channel/client_channel_factory.h"
-#include "src/core/ext/client_channel/lb_policy_factory.h"
-#include "src/core/ext/client_channel/lb_policy_registry.h"
-#include "src/core/ext/client_channel/parse_address.h"
-#include "src/core/ext/lb_policy/grpclb/grpclb.h"
-#include "src/core/ext/lb_policy/grpclb/grpclb_channel.h"
-#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/client_channel_factory.h"
+#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
+#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
+#include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr.h"
@@ -847,7 +847,9 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
* this is the right LB policy to use. */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
- GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER);
+ if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
+ return NULL;
+ }
grpc_lb_addresses *addresses = arg->value.pointer.p;
size_t num_grpclb_addrs = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
index ff23f3a545..b069fae2f8 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
@@ -31,14 +31,14 @@
*
*/
-#ifndef GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_H
-#define GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_H
-#include "src/core/ext/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
/** Returns a load balancing factory for the glb policy, which tries to connect
* to a load balancing server to decide the next successfully connected
* subchannel to pick. */
grpc_lb_policy_factory *grpc_glb_lb_factory_create();
-#endif /* GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_H */
diff --git a/src/core/ext/lb_policy/grpclb/grpclb_channel.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c
index 1b8bbab1b6..d6201f2387 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb_channel.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c
@@ -34,8 +34,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/client_channel/client_channel.h"
-#include "src/core/ext/lb_policy/grpclb/grpclb_channel.h"
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/support/string.h"
diff --git a/src/core/ext/lb_policy/grpclb/grpclb_channel.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
index f66082d78e..9730c971d9 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb_channel.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
@@ -31,10 +31,10 @@
*
*/
-#ifndef GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H
-#define GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H
-#include "src/core/ext/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/lib/slice/slice_hash_table.h"
/** Create the channel used for communicating with an LB service.
@@ -53,4 +53,5 @@ grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx,
grpc_slice_hash_table *targets_info,
const grpc_channel_args *args);
-#endif /* GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H \
+ */
diff --git a/src/core/ext/lb_policy/grpclb/grpclb_channel_secure.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c
index 2fee5f1b8e..a145cba63c 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb_channel_secure.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c
@@ -34,8 +34,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/client_channel/client_channel.h"
-#include "src/core/ext/lb_policy/grpclb/grpclb_channel.h"
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/security/credentials/credentials.h"
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
index 3c4604c402..10af252531 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
+#include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
#include "third_party/nanopb/pb_decode.h"
#include "third_party/nanopb/pb_encode.h"
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
index b4c967e426..d014b8800c 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
@@ -31,13 +31,13 @@
*
*/
-#ifndef GRPC_CORE_EXT_LB_POLICY_GRPCLB_LOAD_BALANCER_API_H
-#define GRPC_CORE_EXT_LB_POLICY_GRPCLB_LOAD_BALANCER_API_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_LOAD_BALANCER_API_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_LOAD_BALANCER_API_H
#include <grpc/slice_buffer.h>
-#include "src/core/ext/client_channel/lb_policy_factory.h"
-#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
+#include "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#ifdef __cplusplus
extern "C" {
@@ -101,4 +101,5 @@ void grpc_grpclb_initial_response_destroy(
}
#endif
-#endif /* GRPC_CORE_EXT_LB_POLICY_GRPCLB_LOAD_BALANCER_API_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_LOAD_BALANCER_API_H \
+ */
diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
index e352e0396f..e9adf98711 100644
--- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
@@ -1,7 +1,7 @@
/* Automatically generated nanopb constant definitions */
/* Generated by nanopb-0.3.7-dev */
-#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
+#include "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
index 725aa7e386..725aa7e386 100644
--- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
index fc65dfdcb9..2b77cd39b8 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
@@ -35,8 +35,8 @@
#include <grpc/support/alloc.h>
-#include "src/core/ext/client_channel/lb_policy_registry.h"
-#include "src/core/ext/client_channel/subchannel.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -402,7 +402,9 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
* addresses, since we don't know how to handle them. */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
- GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER);
+ if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
+ return NULL;
+ }
grpc_lb_addresses *addresses = arg->value.pointer.p;
size_t num_addrs = 0;
for (size_t i = 0; i < addresses->num_addresses; i++) {
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index a62082a2ff..ff41e61b3e 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -63,8 +63,8 @@
#include <grpc/support/alloc.h>
-#include "src/core/ext/client_channel/lb_policy_registry.h"
-#include "src/core/ext/client_channel/subchannel.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -691,7 +691,9 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
* addresses, since we don't know how to handle them. */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
- GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER);
+ if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
+ return NULL;
+ }
grpc_lb_addresses *addresses = arg->value.pointer.p;
size_t num_addrs = 0;
for (size_t i = 0; i < addresses->num_addresses; i++) {
diff --git a/src/core/ext/client_channel/lb_policy_factory.c b/src/core/ext/filters/client_channel/lb_policy_factory.c
index 7af9bb0411..e2af216b89 100644
--- a/src/core/ext/client_channel/lb_policy_factory.c
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.c
@@ -36,7 +36,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
grpc_lb_addresses* grpc_lb_addresses_create(
size_t num_addresses, const grpc_lb_user_data_vtable* user_data_vtable) {
diff --git a/src/core/ext/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h
index 27c12c0d73..81ab12ec8f 100644
--- a/src/core/ext/client_channel/lb_policy_factory.h
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.h
@@ -31,11 +31,11 @@
*
*/
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_FACTORY_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_FACTORY_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_FACTORY_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_FACTORY_H
-#include "src/core/ext/client_channel/client_channel_factory.h"
-#include "src/core/ext/client_channel/lb_policy.h"
+#include "src/core/ext/filters/client_channel/client_channel_factory.h"
+#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/resolve_address.h"
@@ -131,4 +131,4 @@ grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy(
grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args);
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_FACTORY_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_FACTORY_H */
diff --git a/src/core/ext/client_channel/lb_policy_registry.c b/src/core/ext/filters/client_channel/lb_policy_registry.c
index 90c149d947..1376673bca 100644
--- a/src/core/ext/client_channel/lb_policy_registry.c
+++ b/src/core/ext/filters/client_channel/lb_policy_registry.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include <string.h>
diff --git a/src/core/ext/client_channel/lb_policy_registry.h b/src/core/ext/filters/client_channel/lb_policy_registry.h
index 21c468e021..dfdec1347f 100644
--- a/src/core/ext/client_channel/lb_policy_registry.h
+++ b/src/core/ext/filters/client_channel/lb_policy_registry.h
@@ -31,10 +31,10 @@
*
*/
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_REGISTRY_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_REGISTRY_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_REGISTRY_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_REGISTRY_H
-#include "src/core/ext/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/lib/iomgr/exec_ctx.h"
/** Initialize the registry and set \a default_factory as the factory to be
@@ -52,4 +52,4 @@ void grpc_register_lb_policy(grpc_lb_policy_factory *factory);
grpc_lb_policy *grpc_lb_policy_create(grpc_exec_ctx *exec_ctx, const char *name,
grpc_lb_policy_args *args);
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_REGISTRY_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_REGISTRY_H */
diff --git a/src/core/ext/client_channel/parse_address.c b/src/core/ext/filters/client_channel/parse_address.c
index cd1b2cd80c..0c97062075 100644
--- a/src/core/ext/client_channel/parse_address.c
+++ b/src/core/ext/filters/client_channel/parse_address.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/client_channel/parse_address.h"
+#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include <stdio.h>
diff --git a/src/core/ext/client_channel/parse_address.h b/src/core/ext/filters/client_channel/parse_address.h
index bf99c5298a..c8d77baa00 100644
--- a/src/core/ext/client_channel/parse_address.h
+++ b/src/core/ext/filters/client_channel/parse_address.h
@@ -31,12 +31,12 @@
*
*/
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_PARSE_ADDRESS_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_PARSE_ADDRESS_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_PARSE_ADDRESS_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_PARSE_ADDRESS_H
#include <stddef.h>
-#include "src/core/ext/client_channel/uri_parser.h"
+#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/iomgr/resolve_address.h"
/** Populate \a addr and \a len from \a uri, whose path is expected to contain a
@@ -51,4 +51,4 @@ int parse_ipv4(grpc_uri *uri, grpc_resolved_address *resolved_addr);
* host:port pair. Returns true upon success. */
int parse_ipv6(grpc_uri *uri, grpc_resolved_address *resolved_addr);
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_PARSE_ADDRESS_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_PARSE_ADDRESS_H */
diff --git a/src/core/ext/client_channel/proxy_mapper.c b/src/core/ext/filters/client_channel/proxy_mapper.c
index f92afe847b..6dddd3c6da 100644
--- a/src/core/ext/client_channel/proxy_mapper.c
+++ b/src/core/ext/filters/client_channel/proxy_mapper.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/client_channel/proxy_mapper.h"
+#include "src/core/ext/filters/client_channel/proxy_mapper.h"
void grpc_proxy_mapper_init(const grpc_proxy_mapper_vtable* vtable,
grpc_proxy_mapper* mapper) {
diff --git a/src/core/ext/client_channel/proxy_mapper.h b/src/core/ext/filters/client_channel/proxy_mapper.h
index 6e4607fe4d..d0c47d3090 100644
--- a/src/core/ext/client_channel/proxy_mapper.h
+++ b/src/core/ext/filters/client_channel/proxy_mapper.h
@@ -31,8 +31,8 @@
*
*/
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_PROXY_MAPPER_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_PROXY_MAPPER_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_PROXY_MAPPER_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_PROXY_MAPPER_H
#include <stdbool.h>
@@ -86,4 +86,4 @@ bool grpc_proxy_mapper_map_address(grpc_exec_ctx* exec_ctx,
void grpc_proxy_mapper_destroy(grpc_proxy_mapper* mapper);
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_PROXY_MAPPER_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_PROXY_MAPPER_H */
diff --git a/src/core/ext/client_channel/proxy_mapper_registry.c b/src/core/ext/filters/client_channel/proxy_mapper_registry.c
index 0935ddbdbd..7a39289a84 100644
--- a/src/core/ext/client_channel/proxy_mapper_registry.c
+++ b/src/core/ext/filters/client_channel/proxy_mapper_registry.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/client_channel/proxy_mapper_registry.h"
+#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
#include <string.h>
diff --git a/src/core/ext/client_channel/proxy_mapper_registry.h b/src/core/ext/filters/client_channel/proxy_mapper_registry.h
index 742b57a2d4..8b5686d664 100644
--- a/src/core/ext/client_channel/proxy_mapper_registry.h
+++ b/src/core/ext/filters/client_channel/proxy_mapper_registry.h
@@ -31,10 +31,10 @@
*
*/
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_PROXY_MAPPER_REGISTRY_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_PROXY_MAPPER_REGISTRY_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_PROXY_MAPPER_REGISTRY_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_PROXY_MAPPER_REGISTRY_H
-#include "src/core/ext/client_channel/proxy_mapper.h"
+#include "src/core/ext/filters/client_channel/proxy_mapper.h"
void grpc_proxy_mapper_registry_init();
void grpc_proxy_mapper_registry_shutdown();
@@ -56,4 +56,4 @@ bool grpc_proxy_mappers_map_address(grpc_exec_ctx* exec_ctx,
grpc_resolved_address** new_address,
grpc_channel_args** new_args);
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_PROXY_MAPPER_REGISTRY_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_PROXY_MAPPER_REGISTRY_H */
diff --git a/src/core/ext/client_channel/resolver.c b/src/core/ext/filters/client_channel/resolver.c
index b1a1faa6c9..cafa2837f3 100644
--- a/src/core/ext/client_channel/resolver.c
+++ b/src/core/ext/filters/client_channel/resolver.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/client_channel/resolver.h"
+#include "src/core/ext/filters/client_channel/resolver.h"
#include "src/core/lib/iomgr/combiner.h"
void grpc_resolver_init(grpc_resolver *resolver,
diff --git a/src/core/ext/client_channel/resolver.h b/src/core/ext/filters/client_channel/resolver.h
index bbba424ca5..55f9124b3a 100644
--- a/src/core/ext/client_channel/resolver.h
+++ b/src/core/ext/filters/client_channel/resolver.h
@@ -31,10 +31,10 @@
*
*/
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_H
-#include "src/core/ext/client_channel/subchannel.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/iomgr/iomgr.h"
typedef struct grpc_resolver grpc_resolver;
@@ -98,4 +98,4 @@ void grpc_resolver_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
grpc_channel_args **result,
grpc_closure *on_complete);
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_H */
diff --git a/src/core/ext/resolver/README.md b/src/core/ext/filters/client_channel/resolver/README.md
index b0e234e96a..b0e234e96a 100644
--- a/src/core/ext/resolver/README.md
+++ b/src/core/ext/filters/client_channel/resolver/README.md
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
new file mode 100644
index 0000000000..ffaeeed324
--- /dev/null
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
@@ -0,0 +1,350 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+#if GRPC_ARES == 1 && !defined(GRPC_UV)
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/support/backoff.h"
+#include "src/core/lib/support/env.h"
+#include "src/core/lib/support/string.h"
+
+#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1
+#define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1
+#define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6
+#define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120
+#define GRPC_DNS_RECONNECT_JITTER 0.2
+
+typedef struct {
+ /** base class: must be first */
+ grpc_resolver base;
+ /** name to resolve (usually the same as target_name) */
+ char *name_to_resolve;
+ /** default port to use */
+ char *default_port;
+ /** channel args. */
+ grpc_channel_args *channel_args;
+ /** pollset_set to drive the name resolution process */
+ grpc_pollset_set *interested_parties;
+
+ /** Closures used by the combiner */
+ grpc_closure dns_ares_on_retry_timer_locked;
+ grpc_closure dns_ares_on_resolved_locked;
+
+ /** Combiner guarding the rest of the state */
+ grpc_combiner *combiner;
+ /** are we currently resolving? */
+ bool resolving;
+ /** which version of the result have we published? */
+ int published_version;
+ /** which version of the result is current? */
+ int resolved_version;
+ /** pending next completion, or NULL */
+ grpc_closure *next_completion;
+ /** target result address for next completion */
+ grpc_channel_args **target_result;
+ /** current (fully resolved) result */
+ grpc_channel_args *resolved_result;
+ /** retry timer */
+ bool have_retry_timer;
+ grpc_timer retry_timer;
+ /** retry backoff state */
+ gpr_backoff backoff_state;
+
+ /** currently resolving addresses */
+ grpc_resolved_addresses *addresses;
+} ares_dns_resolver;
+
+static void dns_ares_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
+
+static void dns_ares_start_resolving_locked(grpc_exec_ctx *exec_ctx,
+ ares_dns_resolver *r);
+static void dns_ares_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
+ ares_dns_resolver *r);
+
+static void dns_ares_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
+static void dns_ares_channel_saw_error_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *r);
+static void dns_ares_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
+ grpc_channel_args **target_result,
+ grpc_closure *on_complete);
+
+static const grpc_resolver_vtable dns_ares_resolver_vtable = {
+ dns_ares_destroy, dns_ares_shutdown_locked,
+ dns_ares_channel_saw_error_locked, dns_ares_next_locked};
+
+static void dns_ares_shutdown_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
+ ares_dns_resolver *r = (ares_dns_resolver *)resolver;
+ if (r->have_retry_timer) {
+ grpc_timer_cancel(exec_ctx, &r->retry_timer);
+ }
+ if (r->next_completion != NULL) {
+ *r->target_result = NULL;
+ grpc_closure_sched(
+ exec_ctx, r->next_completion,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resolver Shutdown"));
+ r->next_completion = NULL;
+ }
+}
+
+static void dns_ares_channel_saw_error_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
+ ares_dns_resolver *r = (ares_dns_resolver *)resolver;
+ if (!r->resolving) {
+ gpr_backoff_reset(&r->backoff_state);
+ dns_ares_start_resolving_locked(exec_ctx, r);
+ }
+}
+
+static void dns_ares_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ ares_dns_resolver *r = arg;
+ r->have_retry_timer = false;
+ if (error == GRPC_ERROR_NONE) {
+ if (!r->resolving) {
+ dns_ares_start_resolving_locked(exec_ctx, r);
+ }
+ }
+ GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer");
+}
+
+static void dns_ares_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ ares_dns_resolver *r = arg;
+ grpc_channel_args *result = NULL;
+ GPR_ASSERT(r->resolving);
+ r->resolving = false;
+ if (r->addresses != NULL) {
+ grpc_lb_addresses *addresses = grpc_lb_addresses_create(
+ r->addresses->naddrs, NULL /* user_data_vtable */);
+ for (size_t i = 0; i < r->addresses->naddrs; ++i) {
+ grpc_lb_addresses_set_address(
+ addresses, i, &r->addresses->addrs[i].addr,
+ r->addresses->addrs[i].len, false /* is_balancer */,
+ NULL /* balancer_name */, NULL /* user_data */);
+ }
+ grpc_arg new_arg = grpc_lb_addresses_create_channel_arg(addresses);
+ result = grpc_channel_args_copy_and_add(r->channel_args, &new_arg, 1);
+ grpc_resolved_addresses_destroy(r->addresses);
+ grpc_lb_addresses_destroy(exec_ctx, addresses);
+ } else {
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);
+ gpr_timespec timeout = gpr_time_sub(next_try, now);
+ gpr_log(GPR_INFO, "dns resolution failed (will retry): %s",
+ grpc_error_string(error));
+ GPR_ASSERT(!r->have_retry_timer);
+ r->have_retry_timer = true;
+ GRPC_RESOLVER_REF(&r->base, "retry-timer");
+ if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) {
+ gpr_log(GPR_DEBUG, "retrying in %" PRId64 ".%09d seconds", timeout.tv_sec,
+ timeout.tv_nsec);
+ } else {
+ gpr_log(GPR_DEBUG, "retrying immediately");
+ }
+ grpc_timer_init(exec_ctx, &r->retry_timer, next_try,
+ &r->dns_ares_on_retry_timer_locked, now);
+ }
+ if (r->resolved_result != NULL) {
+ grpc_channel_args_destroy(exec_ctx, r->resolved_result);
+ }
+ r->resolved_result = result;
+ r->resolved_version++;
+ dns_ares_maybe_finish_next_locked(exec_ctx, r);
+ GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving");
+}
+
+static void dns_ares_next_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver,
+ grpc_channel_args **target_result,
+ grpc_closure *on_complete) {
+ gpr_log(GPR_DEBUG, "dns_ares_next is called.");
+ ares_dns_resolver *r = (ares_dns_resolver *)resolver;
+ GPR_ASSERT(!r->next_completion);
+ r->next_completion = on_complete;
+ r->target_result = target_result;
+ if (r->resolved_version == 0 && !r->resolving) {
+ gpr_backoff_reset(&r->backoff_state);
+ dns_ares_start_resolving_locked(exec_ctx, r);
+ } else {
+ dns_ares_maybe_finish_next_locked(exec_ctx, r);
+ }
+}
+
+static void dns_ares_start_resolving_locked(grpc_exec_ctx *exec_ctx,
+ ares_dns_resolver *r) {
+ GRPC_RESOLVER_REF(&r->base, "dns-resolving");
+ GPR_ASSERT(!r->resolving);
+ r->resolving = true;
+ r->addresses = NULL;
+ grpc_resolve_address(exec_ctx, r->name_to_resolve, r->default_port,
+ r->interested_parties, &r->dns_ares_on_resolved_locked,
+ &r->addresses);
+}
+
+static void dns_ares_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
+ ares_dns_resolver *r) {
+ if (r->next_completion != NULL &&
+ r->resolved_version != r->published_version) {
+ *r->target_result = r->resolved_result == NULL
+ ? NULL
+ : grpc_channel_args_copy(r->resolved_result);
+ grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
+ r->next_completion = NULL;
+ r->published_version = r->resolved_version;
+ }
+}
+
+static void dns_ares_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
+ gpr_log(GPR_DEBUG, "dns_ares_destroy");
+ ares_dns_resolver *r = (ares_dns_resolver *)gr;
+ if (r->resolved_result != NULL) {
+ grpc_channel_args_destroy(exec_ctx, r->resolved_result);
+ }
+ grpc_pollset_set_destroy(exec_ctx, r->interested_parties);
+ gpr_free(r->name_to_resolve);
+ gpr_free(r->default_port);
+ grpc_channel_args_destroy(exec_ctx, r->channel_args);
+ gpr_free(r);
+}
+
+static grpc_resolver *dns_ares_create(grpc_exec_ctx *exec_ctx,
+ grpc_resolver_args *args,
+ const char *default_port) {
+ // Get name from args.
+ const char *path = args->uri->path;
+ if (0 != strcmp(args->uri->authority, "")) {
+ gpr_log(GPR_ERROR, "authority based dns uri's not supported");
+ return NULL;
+ }
+ if (path[0] == '/') ++path;
+ // Create resolver.
+ ares_dns_resolver *r = gpr_zalloc(sizeof(ares_dns_resolver));
+ grpc_resolver_init(&r->base, &dns_ares_resolver_vtable, args->combiner);
+ r->name_to_resolve = gpr_strdup(path);
+ r->default_port = gpr_strdup(default_port);
+ r->channel_args = grpc_channel_args_copy(args->args);
+ r->interested_parties = grpc_pollset_set_create();
+ if (args->pollset_set != NULL) {
+ grpc_pollset_set_add_pollset_set(exec_ctx, r->interested_parties,
+ args->pollset_set);
+ }
+ gpr_backoff_init(&r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS,
+ GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER,
+ GRPC_DNS_RECONNECT_JITTER,
+ GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
+ GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ grpc_closure_init(&r->dns_ares_on_retry_timer_locked,
+ dns_ares_on_retry_timer_locked, r,
+ grpc_combiner_scheduler(r->base.combiner, false));
+ grpc_closure_init(&r->dns_ares_on_resolved_locked,
+ dns_ares_on_resolved_locked, r,
+ grpc_combiner_scheduler(r->base.combiner, false));
+ return &r->base;
+}
+
+/*
+ * FACTORY
+ */
+
+static void dns_ares_factory_ref(grpc_resolver_factory *factory) {}
+
+static void dns_ares_factory_unref(grpc_resolver_factory *factory) {}
+
+static grpc_resolver *dns_factory_create_resolver(
+ grpc_exec_ctx *exec_ctx, grpc_resolver_factory *factory,
+ grpc_resolver_args *args) {
+ return dns_ares_create(exec_ctx, args, "https");
+}
+
+static char *dns_ares_factory_get_default_host_name(
+ grpc_resolver_factory *factory, grpc_uri *uri) {
+ const char *path = uri->path;
+ if (path[0] == '/') ++path;
+ return gpr_strdup(path);
+}
+
+static const grpc_resolver_factory_vtable dns_ares_factory_vtable = {
+ dns_ares_factory_ref, dns_ares_factory_unref, dns_factory_create_resolver,
+ dns_ares_factory_get_default_host_name, "dns"};
+static grpc_resolver_factory dns_resolver_factory = {&dns_ares_factory_vtable};
+
+static grpc_resolver_factory *dns_ares_resolver_factory_create() {
+ return &dns_resolver_factory;
+}
+
+void grpc_resolver_dns_ares_init(void) {
+ char *resolver = gpr_getenv("GRPC_DNS_RESOLVER");
+ /* TODO(zyc): Turn on c-ares based resolver by default after the address
+ sorter and the CNAME support are added. */
+ if (resolver != NULL && gpr_stricmp(resolver, "ares") == 0) {
+ grpc_error *error = grpc_ares_init();
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_LOG_IF_ERROR("ares_library_init() failed", error);
+ return;
+ }
+ grpc_resolve_address = grpc_resolve_address_ares;
+ grpc_register_resolver_type(dns_ares_resolver_factory_create());
+ }
+ gpr_free(resolver);
+}
+
+void grpc_resolver_dns_ares_shutdown(void) {
+ char *resolver = gpr_getenv("GRPC_DNS_RESOLVER");
+ if (resolver != NULL && gpr_stricmp(resolver, "ares") == 0) {
+ grpc_ares_cleanup();
+ }
+ gpr_free(resolver);
+}
+
+#else /* GRPC_ARES == 1 && !defined(GRPC_UV) */
+
+void grpc_resolver_dns_ares_init(void) {}
+
+void grpc_resolver_dns_ares_shutdown(void) {}
+
+#endif /* GRPC_ARES == 1 && !defined(GRPC_UV) */
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
new file mode 100644
index 0000000000..aa88940021
--- /dev/null
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
@@ -0,0 +1,66 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H
+
+#include <ares.h>
+
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/pollset_set.h"
+
+typedef struct grpc_ares_ev_driver grpc_ares_ev_driver;
+
+/* Start \a ev_driver. It will keep working until all IO on its ares_channel is
+ done, or grpc_ares_ev_driver_destroy() is called. It may notify the callbacks
+ bound to its ares_channel when necessary. */
+void grpc_ares_ev_driver_start(grpc_exec_ctx *exec_ctx,
+ grpc_ares_ev_driver *ev_driver);
+
+/* Returns the ares_channel owned by \a ev_driver. To bind a c-ares query to
+ \a ev_driver, use the ares_channel owned by \a ev_driver as the arg of the
+ query. */
+ares_channel *grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver *ev_driver);
+
+/* Creates a new grpc_ares_ev_driver. Returns GRPC_ERROR_NONE if \a ev_driver is
+ created successfully. */
+grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver,
+ grpc_pollset_set *pollset_set);
+
+/* Destroys \a ev_driver asynchronously. Pending lookups made on \a ev_driver
+ will be cancelled and their on_done callbacks will be invoked with a status
+ of ARES_ECANCELLED. */
+void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver *ev_driver);
+
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H \
+ */
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
new file mode 100644
index 0000000000..1e4f3eb5ab
--- /dev/null
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
@@ -0,0 +1,319 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
+#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET)
+
+#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
+#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/support/string.h"
+
+typedef struct fd_node {
+ /** the owner of this fd node */
+ grpc_ares_ev_driver *ev_driver;
+ /** the grpc_fd owned by this fd node */
+ grpc_fd *grpc_fd;
+ /** a closure wrapping on_readable_cb, which should be invoked when the
+ grpc_fd in this node becomes readable. */
+ grpc_closure read_closure;
+ /** a closure wrapping on_writable_cb, which should be invoked when the
+ grpc_fd in this node becomes writable. */
+ grpc_closure write_closure;
+ /** next fd node in the list */
+ struct fd_node *next;
+
+ /** mutex guarding the rest of the state */
+ gpr_mu mu;
+ /** if the readable closure has been registered */
+ bool readable_registered;
+ /** if the writable closure has been registered */
+ bool writable_registered;
+} fd_node;
+
+struct grpc_ares_ev_driver {
+ /** the ares_channel owned by this event driver */
+ ares_channel channel;
+ /** pollset set for driving the IO events of the channel */
+ grpc_pollset_set *pollset_set;
+ /** refcount of the event driver */
+ gpr_refcount refs;
+
+ /** mutex guarding the rest of the state */
+ gpr_mu mu;
+ /** a list of grpc_fd that this event driver is currently using. */
+ fd_node *fds;
+ /** is this event driver currently working? */
+ bool working;
+ /** is this event driver being shut down */
+ bool shutting_down;
+};
+
+static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
+ grpc_ares_ev_driver *ev_driver);
+
+static grpc_ares_ev_driver *grpc_ares_ev_driver_ref(
+ grpc_ares_ev_driver *ev_driver) {
+ gpr_log(GPR_DEBUG, "Ref ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
+ gpr_ref(&ev_driver->refs);
+ return ev_driver;
+}
+
+static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver *ev_driver) {
+ gpr_log(GPR_DEBUG, "Unref ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
+ if (gpr_unref(&ev_driver->refs)) {
+ gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
+ GPR_ASSERT(ev_driver->fds == NULL);
+ gpr_mu_destroy(&ev_driver->mu);
+ ares_destroy(ev_driver->channel);
+ gpr_free(ev_driver);
+ }
+}
+
+static void fd_node_destroy(grpc_exec_ctx *exec_ctx, fd_node *fdn) {
+ gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->grpc_fd));
+ GPR_ASSERT(!fdn->readable_registered);
+ GPR_ASSERT(!fdn->writable_registered);
+ gpr_mu_destroy(&fdn->mu);
+ grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->grpc_fd);
+ grpc_fd_shutdown(exec_ctx, fdn->grpc_fd,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("fd node destroyed"));
+ grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, NULL, "c-ares query finished");
+ gpr_free(fdn);
+}
+
+grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver,
+ grpc_pollset_set *pollset_set) {
+ *ev_driver = gpr_malloc(sizeof(grpc_ares_ev_driver));
+ int status = ares_init(&(*ev_driver)->channel);
+ gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create");
+ if (status != ARES_SUCCESS) {
+ char *err_msg;
+ gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s",
+ ares_strerror(status));
+ grpc_error *err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(err_msg);
+ gpr_free(err_msg);
+ gpr_free(*ev_driver);
+ return err;
+ }
+ gpr_mu_init(&(*ev_driver)->mu);
+ gpr_ref_init(&(*ev_driver)->refs, 1);
+ (*ev_driver)->pollset_set = pollset_set;
+ (*ev_driver)->fds = NULL;
+ (*ev_driver)->working = false;
+ (*ev_driver)->shutting_down = false;
+ return GRPC_ERROR_NONE;
+}
+
+void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver *ev_driver) {
+ // It's not safe to shut down remaining fds here directly, becauses
+ // ares_host_callback does not provide an exec_ctx. We mark the event driver
+ // as being shut down. If the event driver is working,
+ // grpc_ares_notify_on_event_locked will shut down the fds; if it's not
+ // working, there are no fds to shut down.
+ gpr_mu_lock(&ev_driver->mu);
+ ev_driver->shutting_down = true;
+ gpr_mu_unlock(&ev_driver->mu);
+ grpc_ares_ev_driver_unref(ev_driver);
+}
+
+// Search fd in the fd_node list head. This is an O(n) search, the max possible
+// value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests.
+static fd_node *pop_fd_node(fd_node **head, int fd) {
+ fd_node dummy_head;
+ dummy_head.next = *head;
+ fd_node *node = &dummy_head;
+ while (node->next != NULL) {
+ if (grpc_fd_wrapped_fd(node->next->grpc_fd) == fd) {
+ fd_node *ret = node->next;
+ node->next = node->next->next;
+ *head = dummy_head.next;
+ return ret;
+ }
+ node = node->next;
+ }
+ return NULL;
+}
+
+static void on_readable_cb(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ fd_node *fdn = arg;
+ grpc_ares_ev_driver *ev_driver = fdn->ev_driver;
+ gpr_mu_lock(&fdn->mu);
+ fdn->readable_registered = false;
+ gpr_mu_unlock(&fdn->mu);
+
+ gpr_log(GPR_DEBUG, "readable on %d", grpc_fd_wrapped_fd(fdn->grpc_fd));
+ if (error == GRPC_ERROR_NONE) {
+ ares_process_fd(ev_driver->channel, grpc_fd_wrapped_fd(fdn->grpc_fd),
+ ARES_SOCKET_BAD);
+ } else {
+ // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
+ // timed out. The pending lookups made on this ev_driver will be cancelled
+ // by the following ares_cancel() and the on_done callbacks will be invoked
+ // with a status of ARES_ECANCELLED. The remaining file descriptors in this
+ // ev_driver will be cleaned up in the follwing
+ // grpc_ares_notify_on_event_locked().
+ ares_cancel(ev_driver->channel);
+ }
+ gpr_mu_lock(&ev_driver->mu);
+ grpc_ares_notify_on_event_locked(exec_ctx, ev_driver);
+ gpr_mu_unlock(&ev_driver->mu);
+ grpc_ares_ev_driver_unref(ev_driver);
+}
+
+static void on_writable_cb(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ fd_node *fdn = arg;
+ grpc_ares_ev_driver *ev_driver = fdn->ev_driver;
+ gpr_mu_lock(&fdn->mu);
+ fdn->writable_registered = false;
+ gpr_mu_unlock(&fdn->mu);
+
+ gpr_log(GPR_DEBUG, "writable on %d", grpc_fd_wrapped_fd(fdn->grpc_fd));
+ if (error == GRPC_ERROR_NONE) {
+ ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD,
+ grpc_fd_wrapped_fd(fdn->grpc_fd));
+ } else {
+ // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
+ // timed out. The pending lookups made on this ev_driver will be cancelled
+ // by the following ares_cancel() and the on_done callbacks will be invoked
+ // with a status of ARES_ECANCELLED. The remaining file descriptors in this
+ // ev_driver will be cleaned up in the follwing
+ // grpc_ares_notify_on_event_locked().
+ ares_cancel(ev_driver->channel);
+ }
+ gpr_mu_lock(&ev_driver->mu);
+ grpc_ares_notify_on_event_locked(exec_ctx, ev_driver);
+ gpr_mu_unlock(&ev_driver->mu);
+ grpc_ares_ev_driver_unref(ev_driver);
+}
+
+ares_channel *grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver *ev_driver) {
+ return &ev_driver->channel;
+}
+
+// Get the file descriptors used by the ev_driver's ares channel, register
+// driver_closure with these filedescriptors.
+static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
+ grpc_ares_ev_driver *ev_driver) {
+ fd_node *new_list = NULL;
+ if (!ev_driver->shutting_down) {
+ ares_socket_t socks[ARES_GETSOCK_MAXNUM];
+ int socks_bitmask =
+ ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM);
+ for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
+ if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
+ ARES_GETSOCK_WRITABLE(socks_bitmask, i)) {
+ fd_node *fdn = pop_fd_node(&ev_driver->fds, socks[i]);
+ // Create a new fd_node if sock[i] is not in the fd_node list.
+ if (fdn == NULL) {
+ char *fd_name;
+ gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i);
+ fdn = gpr_malloc(sizeof(fd_node));
+ gpr_log(GPR_DEBUG, "new fd: %d", socks[i]);
+ fdn->grpc_fd = grpc_fd_create(socks[i], fd_name);
+ fdn->ev_driver = ev_driver;
+ fdn->readable_registered = false;
+ fdn->writable_registered = false;
+ gpr_mu_init(&fdn->mu);
+ grpc_closure_init(&fdn->read_closure, on_readable_cb, fdn,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&fdn->write_closure, on_writable_cb, fdn,
+ grpc_schedule_on_exec_ctx);
+ grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set,
+ fdn->grpc_fd);
+ gpr_free(fd_name);
+ }
+ fdn->next = new_list;
+ new_list = fdn;
+ gpr_mu_lock(&fdn->mu);
+ // Register read_closure if the socket is readable and read_closure has
+ // not been registered with this socket.
+ if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
+ !fdn->readable_registered) {
+ grpc_ares_ev_driver_ref(ev_driver);
+ gpr_log(GPR_DEBUG, "notify read on: %d",
+ grpc_fd_wrapped_fd(fdn->grpc_fd));
+ grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &fdn->read_closure);
+ fdn->readable_registered = true;
+ }
+ // Register write_closure if the socket is writable and write_closure
+ // has not been registered with this socket.
+ if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
+ !fdn->writable_registered) {
+ gpr_log(GPR_DEBUG, "notify write on: %d",
+ grpc_fd_wrapped_fd(fdn->grpc_fd));
+ grpc_ares_ev_driver_ref(ev_driver);
+ grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &fdn->write_closure);
+ fdn->writable_registered = true;
+ }
+ gpr_mu_unlock(&fdn->mu);
+ }
+ }
+ }
+ // Any remaining fds in ev_driver->fds were not returned by ares_getsock() and
+ // are therefore no longer in use, so they can be shut down and removed from
+ // the list.
+ while (ev_driver->fds != NULL) {
+ fd_node *cur = ev_driver->fds;
+ ev_driver->fds = ev_driver->fds->next;
+ fd_node_destroy(exec_ctx, cur);
+ }
+ ev_driver->fds = new_list;
+ // If the ev driver has no working fd, all the tasks are done.
+ if (new_list == NULL) {
+ ev_driver->working = false;
+ gpr_log(GPR_DEBUG, "ev driver stop working");
+ }
+}
+
+void grpc_ares_ev_driver_start(grpc_exec_ctx *exec_ctx,
+ grpc_ares_ev_driver *ev_driver) {
+ gpr_mu_lock(&ev_driver->mu);
+ if (!ev_driver->working) {
+ ev_driver->working = true;
+ grpc_ares_notify_on_event_locked(exec_ctx, ev_driver);
+ }
+ gpr_mu_unlock(&ev_driver->mu);
+}
+
+#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET) */
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
new file mode 100644
index 0000000000..09c46a66e0
--- /dev/null
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
@@ -0,0 +1,289 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+#if GRPC_ARES == 1 && !defined(GRPC_UV)
+
+#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
+#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/socket_utils_posix.h"
+
+#include <string.h>
+#include <sys/types.h>
+
+#include <ares.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
+#include "src/core/lib/iomgr/executor.h"
+#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/support/string.h"
+
+static gpr_once g_basic_init = GPR_ONCE_INIT;
+static gpr_mu g_init_mu;
+
+typedef struct grpc_ares_request {
+ /** following members are set in grpc_resolve_address_ares_impl */
+ /** host to resolve, parsed from the name to resolve */
+ char *host;
+ /** port to fill in sockaddr_in, parsed from the name to resolve */
+ char *port;
+ /** default port to use */
+ char *default_port;
+ /** closure to call when the request completes */
+ grpc_closure *on_done;
+ /** the pointer to receive the resolved addresses */
+ grpc_resolved_addresses **addrs_out;
+ /** the evernt driver used by this request */
+ grpc_ares_ev_driver *ev_driver;
+ /** number of ongoing queries */
+ gpr_refcount pending_queries;
+
+ /** mutex guarding the rest of the state */
+ gpr_mu mu;
+ /** is there at least one successful query, set in on_done_cb */
+ bool success;
+ /** the errors explaining the request failure, set in on_done_cb */
+ grpc_error *error;
+} grpc_ares_request;
+
+static void do_basic_init(void) { gpr_mu_init(&g_init_mu); }
+
+static uint16_t strhtons(const char *port) {
+ if (strcmp(port, "http") == 0) {
+ return htons(80);
+ } else if (strcmp(port, "https") == 0) {
+ return htons(443);
+ }
+ return htons((unsigned short)atoi(port));
+}
+
+static void grpc_ares_request_unref(grpc_exec_ctx *exec_ctx,
+ grpc_ares_request *r) {
+ /* If there are no pending queries, invoke on_done callback and destroy the
+ request */
+ if (gpr_unref(&r->pending_queries)) {
+ /* TODO(zyc): Sort results with RFC6724 before invoking on_done. */
+ if (exec_ctx == NULL) {
+ /* A new exec_ctx is created here, as the c-ares interface does not
+ provide one in ares_host_callback. It's safe to schedule on_done with
+ the newly created exec_ctx, since the caller has been warned not to
+ acquire locks in on_done. ares_dns_resolver is using combiner to
+ protect resources needed by on_done. */
+ grpc_exec_ctx new_exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_closure_sched(&new_exec_ctx, r->on_done, r->error);
+ grpc_exec_ctx_finish(&new_exec_ctx);
+ } else {
+ grpc_closure_sched(exec_ctx, r->on_done, r->error);
+ }
+ gpr_mu_destroy(&r->mu);
+ grpc_ares_ev_driver_destroy(r->ev_driver);
+ gpr_free(r->host);
+ gpr_free(r->port);
+ gpr_free(r->default_port);
+ gpr_free(r);
+ }
+}
+
+static void on_done_cb(void *arg, int status, int timeouts,
+ struct hostent *hostent) {
+ grpc_ares_request *r = (grpc_ares_request *)arg;
+ gpr_mu_lock(&r->mu);
+ if (status == ARES_SUCCESS) {
+ GRPC_ERROR_UNREF(r->error);
+ r->error = GRPC_ERROR_NONE;
+ r->success = true;
+ grpc_resolved_addresses **addresses = r->addrs_out;
+ if (*addresses == NULL) {
+ *addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
+ (*addresses)->naddrs = 0;
+ (*addresses)->addrs = NULL;
+ }
+ size_t prev_naddr = (*addresses)->naddrs;
+ size_t i;
+ for (i = 0; hostent->h_addr_list[i] != NULL; i++) {
+ }
+ (*addresses)->naddrs += i;
+ (*addresses)->addrs =
+ gpr_realloc((*addresses)->addrs,
+ sizeof(grpc_resolved_address) * (*addresses)->naddrs);
+ for (i = prev_naddr; i < (*addresses)->naddrs; i++) {
+ memset(&(*addresses)->addrs[i], 0, sizeof(grpc_resolved_address));
+ if (hostent->h_addrtype == AF_INET6) {
+ (*addresses)->addrs[i].len = sizeof(struct sockaddr_in6);
+ struct sockaddr_in6 *addr =
+ (struct sockaddr_in6 *)&(*addresses)->addrs[i].addr;
+ addr->sin6_family = (sa_family_t)hostent->h_addrtype;
+ addr->sin6_port = strhtons(r->port);
+
+ char output[INET6_ADDRSTRLEN];
+ memcpy(&addr->sin6_addr, hostent->h_addr_list[i - prev_naddr],
+ sizeof(struct in6_addr));
+ ares_inet_ntop(AF_INET6, &addr->sin6_addr, output, INET6_ADDRSTRLEN);
+ gpr_log(GPR_DEBUG,
+ "c-ares resolver gets a AF_INET6 result: \n"
+ " addr: %s\n port: %s\n sin6_scope_id: %d\n",
+ output, r->port, addr->sin6_scope_id);
+ } else {
+ (*addresses)->addrs[i].len = sizeof(struct sockaddr_in);
+ struct sockaddr_in *addr =
+ (struct sockaddr_in *)&(*addresses)->addrs[i].addr;
+ memcpy(&addr->sin_addr, hostent->h_addr_list[i - prev_naddr],
+ sizeof(struct in_addr));
+ addr->sin_family = (sa_family_t)hostent->h_addrtype;
+ addr->sin_port = strhtons(r->port);
+
+ char output[INET_ADDRSTRLEN];
+ ares_inet_ntop(AF_INET, &addr->sin_addr, output, INET_ADDRSTRLEN);
+ gpr_log(GPR_DEBUG,
+ "c-ares resolver gets a AF_INET result: \n"
+ " addr: %s\n port: %s\n",
+ output, r->port);
+ }
+ }
+ } else if (!r->success) {
+ char *error_msg;
+ gpr_asprintf(&error_msg, "C-ares status is not ARES_SUCCESS: %s",
+ ares_strerror(status));
+ grpc_error *error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg);
+ gpr_free(error_msg);
+ if (r->error == GRPC_ERROR_NONE) {
+ r->error = error;
+ } else {
+ r->error = grpc_error_add_child(error, r->error);
+ }
+ }
+ gpr_mu_unlock(&r->mu);
+ grpc_ares_request_unref(NULL, r);
+}
+
+void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name,
+ const char *default_port,
+ grpc_pollset_set *interested_parties,
+ grpc_closure *on_done,
+ grpc_resolved_addresses **addrs) {
+ /* TODO(zyc): Enable tracing after #9603 is checked in */
+ /* if (grpc_dns_trace) {
+ gpr_log(GPR_DEBUG, "resolve_address (blocking): name=%s, default_port=%s",
+ name, default_port);
+ } */
+
+ /* parse name, splitting it into host and port parts */
+ char *host;
+ char *port;
+ gpr_split_host_port(name, &host, &port);
+ if (host == NULL) {
+ grpc_error *err = grpc_error_set_str(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("unparseable host:port"),
+ GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name));
+ grpc_closure_sched(exec_ctx, on_done, err);
+ goto error_cleanup;
+ } else if (port == NULL) {
+ if (default_port == NULL) {
+ grpc_error *err = grpc_error_set_str(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("no port in name"),
+ GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name));
+ grpc_closure_sched(exec_ctx, on_done, err);
+ goto error_cleanup;
+ }
+ port = gpr_strdup(default_port);
+ }
+
+ grpc_ares_ev_driver *ev_driver;
+ grpc_error *err = grpc_ares_ev_driver_create(&ev_driver, interested_parties);
+ if (err != GRPC_ERROR_NONE) {
+ GRPC_LOG_IF_ERROR("grpc_ares_ev_driver_create() failed", err);
+ goto error_cleanup;
+ }
+
+ grpc_ares_request *r = gpr_malloc(sizeof(grpc_ares_request));
+ gpr_mu_init(&r->mu);
+ r->ev_driver = ev_driver;
+ r->on_done = on_done;
+ r->addrs_out = addrs;
+ r->default_port = gpr_strdup(default_port);
+ r->port = port;
+ r->host = host;
+ r->success = false;
+ r->error = GRPC_ERROR_NONE;
+ ares_channel *channel = grpc_ares_ev_driver_get_channel(r->ev_driver);
+ gpr_ref_init(&r->pending_queries, 2);
+ if (grpc_ipv6_loopback_available()) {
+ gpr_ref(&r->pending_queries);
+ ares_gethostbyname(*channel, r->host, AF_INET6, on_done_cb, r);
+ }
+ ares_gethostbyname(*channel, r->host, AF_INET, on_done_cb, r);
+ /* TODO(zyc): Handle CNAME records here. */
+ grpc_ares_ev_driver_start(exec_ctx, r->ev_driver);
+ grpc_ares_request_unref(exec_ctx, r);
+ return;
+
+error_cleanup:
+ gpr_free(host);
+ gpr_free(port);
+}
+
+void (*grpc_resolve_address_ares)(
+ grpc_exec_ctx *exec_ctx, const char *name, const char *default_port,
+ grpc_pollset_set *interested_parties, grpc_closure *on_done,
+ grpc_resolved_addresses **addrs) = grpc_resolve_address_ares_impl;
+
+grpc_error *grpc_ares_init(void) {
+ gpr_once_init(&g_basic_init, do_basic_init);
+ gpr_mu_lock(&g_init_mu);
+ int status = ares_library_init(ARES_LIB_INIT_ALL);
+ gpr_mu_unlock(&g_init_mu);
+
+ if (status != ARES_SUCCESS) {
+ char *error_msg;
+ gpr_asprintf(&error_msg, "ares_library_init failed: %s",
+ ares_strerror(status));
+ grpc_error *error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg);
+ gpr_free(error_msg);
+ return error;
+ }
+ return GRPC_ERROR_NONE;
+}
+
+void grpc_ares_cleanup(void) {
+ gpr_mu_lock(&g_init_mu);
+ ares_library_cleanup();
+ gpr_mu_unlock(&g_init_mu);
+}
+
+#endif /* GRPC_ARES == 1 && !defined(GRPC_UV) */
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
new file mode 100644
index 0000000000..3dd40ea268
--- /dev/null
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
@@ -0,0 +1,64 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H
+
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/iomgr.h"
+#include "src/core/lib/iomgr/polling_entity.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+
+/* Asynchronously resolve addr. Use \a default_port if a port isn't designated
+ in addr, otherwise use the port in addr. grpc_ares_init() must be called at
+ least once before this function. \a on_done may be called directly in this
+ function without being scheduled with \a exec_ctx, it must not try to acquire
+ locks that are being held by the caller. */
+extern void (*grpc_resolve_address_ares)(grpc_exec_ctx *exec_ctx,
+ const char *addr,
+ const char *default_port,
+ grpc_pollset_set *interested_parties,
+ grpc_closure *on_done,
+ grpc_resolved_addresses **addresses);
+
+/* Initialize gRPC ares wrapper. Must be called at least once before
+ grpc_resolve_address_ares(). */
+grpc_error *grpc_ares_init(void);
+
+/* Uninitialized gRPC ares wrapper. If there was more than one previous call to
+ grpc_ares_init(), this function uninitializes the gRPC ares wrapper only if
+ it has been called the same number of times as grpc_ares_init(). */
+void grpc_ares_cleanup(void);
+
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H \
+ */
diff --git a/src/core/ext/resolver/dns/native/README.md b/src/core/ext/filters/client_channel/resolver/dns/native/README.md
index 695de47b9f..695de47b9f 100644
--- a/src/core/ext/resolver/dns/native/README.md
+++ b/src/core/ext/filters/client_channel/resolver/dns/native/README.md
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
index d6cb6a95a2..ebe6a07215 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
@@ -37,13 +37,14 @@
#include <grpc/support/host_port.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/client_channel/lb_policy_registry.h"
-#include "src/core/ext/client_channel/resolver_registry.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/backoff.h"
+#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1
@@ -304,7 +305,21 @@ static grpc_resolver_factory *dns_resolver_factory_create() {
}
void grpc_resolver_dns_native_init(void) {
- grpc_register_resolver_type(dns_resolver_factory_create());
+ char *resolver = gpr_getenv("GRPC_DNS_RESOLVER");
+ if (resolver != NULL && gpr_stricmp(resolver, "native") == 0) {
+ gpr_log(GPR_DEBUG, "Using native dns resolver");
+ grpc_register_resolver_type(dns_resolver_factory_create());
+ } else {
+ grpc_resolver_factory *existing_factory =
+ grpc_resolver_factory_lookup("dns");
+ if (existing_factory == NULL) {
+ gpr_log(GPR_DEBUG, "Using native dns resolver");
+ grpc_register_resolver_type(dns_resolver_factory_create());
+ } else {
+ grpc_resolver_factory_unref(existing_factory);
+ }
+ }
+ gpr_free(resolver);
}
void grpc_resolver_dns_native_shutdown(void) {}
diff --git a/src/core/ext/resolver/sockaddr/README.md b/src/core/ext/filters/client_channel/resolver/sockaddr/README.md
index e307ba88f5..e307ba88f5 100644
--- a/src/core/ext/resolver/sockaddr/README.md
+++ b/src/core/ext/filters/client_channel/resolver/sockaddr/README.md
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
index da5923d26f..54f020d691 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
@@ -41,9 +41,9 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/client_channel/lb_policy_factory.h"
-#include "src/core/ext/client_channel/parse_address.h"
-#include "src/core/ext/client_channel/resolver_registry.h"
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/parse_address.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/resolve_address.h"
diff --git a/src/core/ext/client_channel/resolver_factory.c b/src/core/ext/filters/client_channel/resolver_factory.c
index 00bbb92dd0..07f9e4cf03 100644
--- a/src/core/ext/client_channel/resolver_factory.c
+++ b/src/core/ext/filters/client_channel/resolver_factory.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/client_channel/resolver_factory.h"
+#include "src/core/ext/filters/client_channel/resolver_factory.h"
void grpc_resolver_factory_ref(grpc_resolver_factory* factory) {
factory->vtable->ref(factory);
diff --git a/src/core/ext/client_channel/resolver_factory.h b/src/core/ext/filters/client_channel/resolver_factory.h
index e3cd99ec5a..f2fcfc06fe 100644
--- a/src/core/ext/client_channel/resolver_factory.h
+++ b/src/core/ext/filters/client_channel/resolver_factory.h
@@ -31,12 +31,12 @@
*
*/
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_FACTORY_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_FACTORY_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_FACTORY_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_FACTORY_H
-#include "src/core/ext/client_channel/client_channel_factory.h"
-#include "src/core/ext/client_channel/resolver.h"
-#include "src/core/ext/client_channel/uri_parser.h"
+#include "src/core/ext/filters/client_channel/client_channel_factory.h"
+#include "src/core/ext/filters/client_channel/resolver.h"
+#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/iomgr/pollset_set.h"
typedef struct grpc_resolver_factory grpc_resolver_factory;
@@ -82,4 +82,4 @@ grpc_resolver *grpc_resolver_factory_create_resolver(
char *grpc_resolver_factory_get_default_authority(
grpc_resolver_factory *factory, grpc_uri *uri);
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_FACTORY_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_FACTORY_H */
diff --git a/src/core/ext/client_channel/resolver_registry.c b/src/core/ext/filters/client_channel/resolver_registry.c
index 3c5a6fb3ff..fa997bd68f 100644
--- a/src/core/ext/client_channel/resolver_registry.c
+++ b/src/core/ext/filters/client_channel/resolver_registry.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/client_channel/resolver_registry.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include <string.h>
@@ -93,7 +93,6 @@ static grpc_resolver_factory *lookup_factory(const char *name) {
return g_all_of_the_resolvers[i];
}
}
-
return NULL;
}
diff --git a/src/core/ext/client_channel/resolver_registry.h b/src/core/ext/filters/client_channel/resolver_registry.h
index 1a3ebee25a..f2e9f9249c 100644
--- a/src/core/ext/client_channel/resolver_registry.h
+++ b/src/core/ext/filters/client_channel/resolver_registry.h
@@ -31,10 +31,10 @@
*
*/
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_REGISTRY_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_REGISTRY_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_REGISTRY_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_REGISTRY_H
-#include "src/core/ext/client_channel/resolver_factory.h"
+#include "src/core/ext/filters/client_channel/resolver_factory.h"
#include "src/core/lib/iomgr/pollset_set.h"
void grpc_resolver_registry_init();
@@ -81,4 +81,4 @@ char *grpc_get_default_authority(grpc_exec_ctx *exec_ctx, const char *target);
char *grpc_resolver_factory_add_default_prefix_if_needed(
grpc_exec_ctx *exec_ctx, const char *target);
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_REGISTRY_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_REGISTRY_H */
diff --git a/src/core/ext/client_channel/retry_throttle.c b/src/core/ext/filters/client_channel/retry_throttle.c
index 8926c3d782..dd78a17844 100644
--- a/src/core/ext/client_channel/retry_throttle.c
+++ b/src/core/ext/filters/client_channel/retry_throttle.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/client_channel/retry_throttle.h"
+#include "src/core/ext/filters/client_channel/retry_throttle.h"
#include <limits.h>
#include <string.h>
diff --git a/src/core/ext/client_channel/retry_throttle.h b/src/core/ext/filters/client_channel/retry_throttle.h
index f9971faf65..640865cf90 100644
--- a/src/core/ext/client_channel/retry_throttle.h
+++ b/src/core/ext/filters/client_channel/retry_throttle.h
@@ -31,8 +31,8 @@
*
*/
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RETRY_THROTTLE_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RETRY_THROTTLE_H
#include <stdbool.h>
@@ -62,4 +62,4 @@ void grpc_retry_throttle_map_shutdown();
grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server(
const char* server_name, int max_milli_tokens, int milli_token_ratio);
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RETRY_THROTTLE_H */
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c
index 063c0badff..8086cdb87e 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/filters/client_channel/subchannel.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/client_channel/subchannel.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
#include <limits.h>
#include <string.h>
@@ -40,11 +40,11 @@
#include <grpc/support/avl.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/client_channel/client_channel.h"
-#include "src/core/ext/client_channel/parse_address.h"
-#include "src/core/ext/client_channel/proxy_mapper_registry.h"
-#include "src/core/ext/client_channel/subchannel_index.h"
-#include "src/core/ext/client_channel/uri_parser.h"
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/parse_address.h"
+#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
+#include "src/core/ext/filters/client_channel/subchannel_index.h"
+#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -360,7 +360,6 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
for (size_t i = 0; i < c->args->num_args; i++) {
if (0 == strcmp(c->args->args[i].key,
"grpc.testing.fixed_reconnect_backoff_ms")) {
- GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER);
fixed_reconnect_backoff = true;
initial_backoff_ms = min_backoff_ms = max_backoff_ms =
grpc_channel_arg_get_integer(
@@ -749,11 +748,11 @@ char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
GPR_TIMER_BEGIN("grpc_subchannel_call_process_op", 0);
grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
- top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op);
+ top_elem->filter->start_transport_stream_op_batch(exec_ctx, top_elem, op);
GPR_TIMER_END("grpc_subchannel_call_process_op", 0);
}
diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index 3e64a2507c..6473de49b0 100644
--- a/src/core/ext/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -31,10 +31,10 @@
*
*/
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
-#include "src/core/ext/client_channel/connector.h"
+#include "src/core/ext/filters/client_channel/connector.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/support/arena.h"
@@ -157,7 +157,7 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
/** continue processing a transport op */
void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *subchannel_call,
- grpc_transport_stream_op *op);
+ grpc_transport_stream_op_batch *op);
/** continue querying for peer */
char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
@@ -200,4 +200,4 @@ const char *grpc_get_subchannel_address_uri_arg(const grpc_channel_args *args);
/// Caller is responsible for freeing the string.
grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address *addr);
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */
diff --git a/src/core/ext/client_channel/subchannel_index.c b/src/core/ext/filters/client_channel/subchannel_index.c
index 11889300a2..f6ef4a845e 100644
--- a/src/core/ext/client_channel/subchannel_index.c
+++ b/src/core/ext/filters/client_channel/subchannel_index.c
@@ -31,7 +31,7 @@
//
//
-#include "src/core/ext/client_channel/subchannel_index.h"
+#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include <stdbool.h>
#include <string.h>
diff --git a/src/core/ext/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h
index a67bd5e219..f673ade378 100644
--- a/src/core/ext/client_channel/subchannel_index.h
+++ b/src/core/ext/filters/client_channel/subchannel_index.h
@@ -31,11 +31,11 @@
*
*/
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H
-#include "src/core/ext/client_channel/connector.h"
-#include "src/core/ext/client_channel/subchannel.h"
+#include "src/core/ext/filters/client_channel/connector.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
/** \file Provides an index of active subchannels so that they can be
shared amongst channels */
@@ -74,4 +74,4 @@ void grpc_subchannel_index_init(void);
/** Shutdown the subchannel index (global) */
void grpc_subchannel_index_shutdown(void);
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H */
diff --git a/src/core/ext/client_channel/uri_parser.c b/src/core/ext/filters/client_channel/uri_parser.c
index d385db0801..01b99911aa 100644
--- a/src/core/ext/client_channel/uri_parser.c
+++ b/src/core/ext/filters/client_channel/uri_parser.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/client_channel/uri_parser.h"
+#include "src/core/ext/filters/client_channel/uri_parser.h"
#include <string.h>
diff --git a/src/core/ext/client_channel/uri_parser.h b/src/core/ext/filters/client_channel/uri_parser.h
index efd4302c1c..2698d448d8 100644
--- a/src/core/ext/client_channel/uri_parser.h
+++ b/src/core/ext/filters/client_channel/uri_parser.h
@@ -31,8 +31,8 @@
*
*/
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_URI_PARSER_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_URI_PARSER_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_URI_PARSER_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_URI_PARSER_H
#include <stddef.h>
#include "src/core/lib/iomgr/exec_ctx.h"
@@ -62,4 +62,4 @@ const char *grpc_uri_get_query_arg(const grpc_uri *uri, const char *key);
/** destroy a uri */
void grpc_uri_destroy(grpc_uri *uri);
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_URI_PARSER_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_URI_PARSER_H */
diff --git a/src/core/ext/load_reporting/load_reporting.c b/src/core/ext/filters/load_reporting/load_reporting.c
index 942aea4fd1..9fb33bab71 100644
--- a/src/core/ext/load_reporting/load_reporting.c
+++ b/src/core/ext/filters/load_reporting/load_reporting.c
@@ -38,8 +38,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/sync.h>
-#include "src/core/ext/load_reporting/load_reporting.h"
-#include "src/core/ext/load_reporting/load_reporting_filter.h"
+#include "src/core/ext/filters/load_reporting/load_reporting.h"
+#include "src/core/ext/filters/load_reporting/load_reporting_filter.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/call.h"
diff --git a/src/core/ext/load_reporting/load_reporting.h b/src/core/ext/filters/load_reporting/load_reporting.h
index 22859a599a..e05a9dcc71 100644
--- a/src/core/ext/load_reporting/load_reporting.h
+++ b/src/core/ext/filters/load_reporting/load_reporting.h
@@ -31,8 +31,8 @@
*
*/
-#ifndef GRPC_CORE_EXT_LOAD_REPORTING_LOAD_REPORTING_H
-#define GRPC_CORE_EXT_LOAD_REPORTING_LOAD_REPORTING_H
+#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_H
+#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_H
#include <grpc/impl/codegen/grpc_types.h>
@@ -70,4 +70,4 @@ typedef struct grpc_load_reporting_call_data {
/** Return a \a grpc_arg enabling load reporting */
grpc_arg grpc_load_reporting_enable_arg();
-#endif /* GRPC_CORE_EXT_LOAD_REPORTING_LOAD_REPORTING_H */
+#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_H */
diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/filters/load_reporting/load_reporting_filter.c
index ea57c85c3a..57b25d0651 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/filters/load_reporting/load_reporting_filter.c
@@ -39,8 +39,8 @@
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
-#include "src/core/ext/load_reporting/load_reporting.h"
-#include "src/core/ext/load_reporting/load_reporting_filter.h"
+#include "src/core/ext/filters/load_reporting/load_reporting.h"
+#include "src/core/ext/filters/load_reporting/load_reporting_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -183,25 +183,28 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
*/
}
-static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op *op) {
- GPR_TIMER_BEGIN("lr_start_transport_stream_op", 0);
+static void lr_start_transport_stream_op_batch(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
+ GPR_TIMER_BEGIN("lr_start_transport_stream_op_batch", 0);
call_data *calld = elem->call_data;
if (op->recv_initial_metadata) {
- calld->recv_initial_metadata = op->recv_initial_metadata;
+ calld->recv_initial_metadata =
+ op->payload->recv_initial_metadata.recv_initial_metadata;
/* substitute our callback for the higher callback */
- calld->ops_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
- op->recv_initial_metadata_ready = &calld->on_initial_md_ready;
+ calld->ops_recv_initial_metadata_ready =
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready;
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready =
+ &calld->on_initial_md_ready;
}
grpc_call_next_op(exec_ctx, elem, op);
- GPR_TIMER_END("lr_start_transport_stream_op", 0);
+ GPR_TIMER_END("lr_start_transport_stream_op_batch", 0);
}
const grpc_channel_filter grpc_load_reporting_filter = {
- lr_start_transport_stream_op,
+ lr_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
diff --git a/src/core/ext/load_reporting/load_reporting_filter.h b/src/core/ext/filters/load_reporting/load_reporting_filter.h
index 160ed32af9..da884a1479 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.h
+++ b/src/core/ext/filters/load_reporting/load_reporting_filter.h
@@ -31,12 +31,12 @@
*
*/
-#ifndef GRPC_CORE_EXT_LOAD_REPORTING_LOAD_REPORTING_FILTER_H
-#define GRPC_CORE_EXT_LOAD_REPORTING_LOAD_REPORTING_FILTER_H
+#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_FILTER_H
+#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_FILTER_H
-#include "src/core/ext/load_reporting/load_reporting.h"
+#include "src/core/ext/filters/load_reporting/load_reporting.h"
#include "src/core/lib/channel/channel_stack.h"
extern const grpc_channel_filter grpc_load_reporting_filter;
-#endif /* GRPC_CORE_EXT_LOAD_REPORTING_LOAD_REPORTING_FILTER_H */
+#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_FILTER_H */
diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c
new file mode 100644
index 0000000000..b03cb0ba0a
--- /dev/null
+++ b/src/core/ext/filters/max_age/max_age_filter.c
@@ -0,0 +1,419 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/channel/message_size_filter.h"
+
+#include <limits.h>
+#include <string.h>
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/channel_stack_builder.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/surface/channel_init.h"
+#include "src/core/lib/transport/http2_errors.h"
+#include "src/core/lib/transport/service_config.h"
+
+#define DEFAULT_MAX_CONNECTION_AGE_MS INT_MAX
+#define DEFAULT_MAX_CONNECTION_AGE_GRACE_MS INT_MAX
+#define DEFAULT_MAX_CONNECTION_IDLE_MS INT_MAX
+
+typedef struct channel_data {
+ /* We take a reference to the channel stack for the timer callback */
+ grpc_channel_stack* channel_stack;
+ /* Guards access to max_age_timer, max_age_timer_pending, max_age_grace_timer
+ and max_age_grace_timer_pending */
+ gpr_mu max_age_timer_mu;
+ /* True if the max_age timer callback is currently pending */
+ bool max_age_timer_pending;
+ /* True if the max_age_grace timer callback is currently pending */
+ bool max_age_grace_timer_pending;
+ /* The timer for checking if the channel has reached its max age */
+ grpc_timer max_age_timer;
+ /* The timer for checking if the max-aged channel has uesed up the grace
+ period */
+ grpc_timer max_age_grace_timer;
+ /* The timer for checking if the channel's idle duration reaches
+ max_connection_idle */
+ grpc_timer max_idle_timer;
+ /* Allowed max time a channel may have no outstanding rpcs */
+ gpr_timespec max_connection_idle;
+ /* Allowed max time a channel may exist */
+ gpr_timespec max_connection_age;
+ /* Allowed grace period after the channel reaches its max age */
+ gpr_timespec max_connection_age_grace;
+ /* Closure to run when the channel's idle duration reaches max_connection_idle
+ and should be closed gracefully */
+ grpc_closure close_max_idle_channel;
+ /* Closure to run when the channel reaches its max age and should be closed
+ gracefully */
+ grpc_closure close_max_age_channel;
+ /* Closure to run the channel uses up its max age grace time and should be
+ closed forcibly */
+ grpc_closure force_close_max_age_channel;
+ /* Closure to run when the init fo channel stack is done and the max_idle
+ timer should be started */
+ grpc_closure start_max_idle_timer_after_init;
+ /* Closure to run when the init fo channel stack is done and the max_age timer
+ should be started */
+ grpc_closure start_max_age_timer_after_init;
+ /* Closure to run when the goaway op is finished and the max_age_timer */
+ grpc_closure start_max_age_grace_timer_after_goaway_op;
+ /* Closure to run when the channel connectivity state changes */
+ grpc_closure channel_connectivity_changed;
+ /* Records the current connectivity state */
+ grpc_connectivity_state connectivity_state;
+ /* Number of active calls */
+ gpr_atm call_count;
+} channel_data;
+
+/* Increase the nubmer of active calls. Before the increasement, if there are no
+ calls, the max_idle_timer should be cancelled. */
+static void increase_call_count(grpc_exec_ctx* exec_ctx, channel_data* chand) {
+ if (gpr_atm_full_fetch_add(&chand->call_count, 1) == 0) {
+ grpc_timer_cancel(exec_ctx, &chand->max_idle_timer);
+ }
+}
+
+/* Decrease the nubmer of active calls. After the decrement, if there are no
+ calls, the max_idle_timer should be started. */
+static void decrease_call_count(grpc_exec_ctx* exec_ctx, channel_data* chand) {
+ if (gpr_atm_full_fetch_add(&chand->call_count, -1) == 1) {
+ GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_idle_timer");
+ grpc_timer_init(
+ exec_ctx, &chand->max_idle_timer,
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), chand->max_connection_idle),
+ &chand->close_max_idle_channel, gpr_now(GPR_CLOCK_MONOTONIC));
+ }
+}
+
+static void start_max_idle_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ channel_data* chand = arg;
+ /* Decrease call_count. If there are no active calls at this time,
+ max_idle_timer will start here. If the number of active calls is not 0,
+ max_idle_timer will start after all the active calls end. */
+ decrease_call_count(exec_ctx, chand);
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack,
+ "max_age start_max_idle_timer_after_init");
+}
+
+static void start_max_age_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ channel_data* chand = arg;
+ gpr_mu_lock(&chand->max_age_timer_mu);
+ chand->max_age_timer_pending = true;
+ GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer");
+ grpc_timer_init(
+ exec_ctx, &chand->max_age_timer,
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), chand->max_connection_age),
+ &chand->close_max_age_channel, gpr_now(GPR_CLOCK_MONOTONIC));
+ gpr_mu_unlock(&chand->max_age_timer_mu);
+ grpc_transport_op* op = grpc_make_transport_op(NULL);
+ op->on_connectivity_state_change = &chand->channel_connectivity_changed,
+ op->connectivity_state = &chand->connectivity_state;
+ grpc_channel_next_op(exec_ctx,
+ grpc_channel_stack_element(chand->channel_stack, 0), op);
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack,
+ "max_age start_max_age_timer_after_init");
+}
+
+static void start_max_age_grace_timer_after_goaway_op(grpc_exec_ctx* exec_ctx,
+ void* arg,
+ grpc_error* error) {
+ channel_data* chand = arg;
+ gpr_mu_lock(&chand->max_age_timer_mu);
+ chand->max_age_grace_timer_pending = true;
+ GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer");
+ grpc_timer_init(exec_ctx, &chand->max_age_grace_timer,
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ chand->max_connection_age_grace),
+ &chand->force_close_max_age_channel,
+ gpr_now(GPR_CLOCK_MONOTONIC));
+ gpr_mu_unlock(&chand->max_age_timer_mu);
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack,
+ "max_age start_max_age_grace_timer_after_goaway_op");
+}
+
+static void close_max_idle_channel(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ channel_data* chand = arg;
+ gpr_atm_no_barrier_fetch_add(&chand->call_count, 1);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_transport_op* op = grpc_make_transport_op(NULL);
+ op->goaway_error =
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_idle"),
+ GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR);
+ grpc_channel_element* elem =
+ grpc_channel_stack_element(chand->channel_stack, 0);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
+ } else if (error != GRPC_ERROR_CANCELLED) {
+ GRPC_LOG_IF_ERROR("close_max_idle_channel", error);
+ }
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack,
+ "max_age max_idle_timer");
+}
+
+static void close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ channel_data* chand = arg;
+ gpr_mu_lock(&chand->max_age_timer_mu);
+ chand->max_age_timer_pending = false;
+ gpr_mu_unlock(&chand->max_age_timer_mu);
+ if (error == GRPC_ERROR_NONE) {
+ GRPC_CHANNEL_STACK_REF(chand->channel_stack,
+ "max_age start_max_age_grace_timer_after_goaway_op");
+ grpc_transport_op* op = grpc_make_transport_op(
+ &chand->start_max_age_grace_timer_after_goaway_op);
+ op->goaway_error =
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_age"),
+ GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR);
+ grpc_channel_element* elem =
+ grpc_channel_stack_element(chand->channel_stack, 0);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
+ } else if (error != GRPC_ERROR_CANCELLED) {
+ GRPC_LOG_IF_ERROR("close_max_age_channel", error);
+ }
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack,
+ "max_age max_age_timer");
+}
+
+static void force_close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ channel_data* chand = arg;
+ gpr_mu_lock(&chand->max_age_timer_mu);
+ chand->max_age_grace_timer_pending = false;
+ gpr_mu_unlock(&chand->max_age_timer_mu);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_transport_op* op = grpc_make_transport_op(NULL);
+ op->disconnect_with_error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel reaches max age");
+ grpc_channel_element* elem =
+ grpc_channel_stack_element(chand->channel_stack, 0);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
+ } else if (error != GRPC_ERROR_CANCELLED) {
+ GRPC_LOG_IF_ERROR("force_close_max_age_channel", error);
+ }
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack,
+ "max_age max_age_grace_timer");
+}
+
+static void channel_connectivity_changed(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ channel_data* chand = arg;
+ if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
+ grpc_transport_op* op = grpc_make_transport_op(NULL);
+ op->on_connectivity_state_change = &chand->channel_connectivity_changed,
+ op->connectivity_state = &chand->connectivity_state;
+ grpc_channel_next_op(
+ exec_ctx, grpc_channel_stack_element(chand->channel_stack, 0), op);
+ } else {
+ gpr_mu_lock(&chand->max_age_timer_mu);
+ if (chand->max_age_timer_pending) {
+ grpc_timer_cancel(exec_ctx, &chand->max_age_timer);
+ chand->max_age_timer_pending = false;
+ }
+ if (chand->max_age_grace_timer_pending) {
+ grpc_timer_cancel(exec_ctx, &chand->max_age_grace_timer);
+ chand->max_age_grace_timer_pending = false;
+ }
+ gpr_mu_unlock(&chand->max_age_timer_mu);
+ /* If there are no active calls, this increasement will cancel
+ max_idle_timer, and prevent max_idle_timer from being started in the
+ future. */
+ increase_call_count(exec_ctx, chand);
+ }
+}
+
+/* Constructor for call_data. */
+static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ channel_data* chand = elem->channel_data;
+ increase_call_count(exec_ctx, chand);
+ return GRPC_ERROR_NONE;
+}
+
+/* Destructor for call_data. */
+static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ grpc_closure* ignored) {
+ channel_data* chand = elem->channel_data;
+ decrease_call_count(exec_ctx, chand);
+}
+
+/* Constructor for channel_data. */
+static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem,
+ grpc_channel_element_args* args) {
+ channel_data* chand = elem->channel_data;
+ gpr_mu_init(&chand->max_age_timer_mu);
+ chand->max_age_timer_pending = false;
+ chand->max_age_grace_timer_pending = false;
+ chand->channel_stack = args->channel_stack;
+ chand->max_connection_age =
+ DEFAULT_MAX_CONNECTION_AGE_MS == INT_MAX
+ ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(DEFAULT_MAX_CONNECTION_AGE_MS, GPR_TIMESPAN);
+ chand->max_connection_age_grace =
+ DEFAULT_MAX_CONNECTION_AGE_GRACE_MS == INT_MAX
+ ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(DEFAULT_MAX_CONNECTION_AGE_GRACE_MS,
+ GPR_TIMESPAN);
+ chand->max_connection_idle =
+ DEFAULT_MAX_CONNECTION_IDLE_MS == INT_MAX
+ ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(DEFAULT_MAX_CONNECTION_IDLE_MS, GPR_TIMESPAN);
+ for (size_t i = 0; i < args->channel_args->num_args; ++i) {
+ if (0 == strcmp(args->channel_args->args[i].key,
+ GRPC_ARG_MAX_CONNECTION_AGE_MS)) {
+ const int value = grpc_channel_arg_get_integer(
+ &args->channel_args->args[i],
+ (grpc_integer_options){DEFAULT_MAX_CONNECTION_AGE_MS, 1, INT_MAX});
+ chand->max_connection_age =
+ value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(value, GPR_TIMESPAN);
+ } else if (0 == strcmp(args->channel_args->args[i].key,
+ GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS)) {
+ const int value = grpc_channel_arg_get_integer(
+ &args->channel_args->args[i],
+ (grpc_integer_options){DEFAULT_MAX_CONNECTION_AGE_GRACE_MS, 0,
+ INT_MAX});
+ chand->max_connection_age_grace =
+ value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(value, GPR_TIMESPAN);
+ } else if (0 == strcmp(args->channel_args->args[i].key,
+ GRPC_ARG_MAX_CONNECTION_IDLE_MS)) {
+ const int value = grpc_channel_arg_get_integer(
+ &args->channel_args->args[i],
+ (grpc_integer_options){DEFAULT_MAX_CONNECTION_IDLE_MS, 1, INT_MAX});
+ chand->max_connection_idle =
+ value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(value, GPR_TIMESPAN);
+ }
+ }
+ grpc_closure_init(&chand->close_max_idle_channel, close_max_idle_channel,
+ chand, grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&chand->close_max_age_channel, close_max_age_channel, chand,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&chand->force_close_max_age_channel,
+ force_close_max_age_channel, chand,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&chand->start_max_idle_timer_after_init,
+ start_max_idle_timer_after_init, chand,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&chand->start_max_age_timer_after_init,
+ start_max_age_timer_after_init, chand,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&chand->start_max_age_grace_timer_after_goaway_op,
+ start_max_age_grace_timer_after_goaway_op, chand,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&chand->channel_connectivity_changed,
+ channel_connectivity_changed, chand,
+ grpc_schedule_on_exec_ctx);
+
+ if (gpr_time_cmp(chand->max_connection_age, gpr_inf_future(GPR_TIMESPAN)) !=
+ 0) {
+ /* When the channel reaches its max age, we send down an op with
+ goaway_error set. However, we can't send down any ops until after the
+ channel stack is fully initialized. If we start the timer here, we have
+ no guarantee that the timer won't pop before channel stack initialization
+ is finished. To avoid that problem, we create a closure to start the
+ timer, and we schedule that closure to be run after call stack
+ initialization is done. */
+ GRPC_CHANNEL_STACK_REF(chand->channel_stack,
+ "max_age start_max_age_timer_after_init");
+ grpc_closure_sched(exec_ctx, &chand->start_max_age_timer_after_init,
+ GRPC_ERROR_NONE);
+ }
+
+ /* Initialize the number of calls as 1, so that the max_idle_timer will not
+ start until start_max_idle_timer_after_init is invoked. */
+ gpr_atm_rel_store(&chand->call_count, 1);
+ if (gpr_time_cmp(chand->max_connection_idle, gpr_inf_future(GPR_TIMESPAN)) !=
+ 0) {
+ GRPC_CHANNEL_STACK_REF(chand->channel_stack,
+ "max_age start_max_idle_timer_after_init");
+ grpc_closure_sched(exec_ctx, &chand->start_max_idle_timer_after_init,
+ GRPC_ERROR_NONE);
+ }
+ return GRPC_ERROR_NONE;
+}
+
+/* Destructor for channel_data. */
+static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem) {}
+
+const grpc_channel_filter grpc_max_age_filter = {
+ grpc_call_next_op,
+ grpc_channel_next_op,
+ 0, /* sizeof_call_data */
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ grpc_channel_next_get_info,
+ "max_age"};
+
+static bool maybe_add_max_age_filter(grpc_exec_ctx* exec_ctx,
+ grpc_channel_stack_builder* builder,
+ void* arg) {
+ const grpc_channel_args* channel_args =
+ grpc_channel_stack_builder_get_channel_arguments(builder);
+ const grpc_arg* a =
+ grpc_channel_args_find(channel_args, GRPC_ARG_MAX_CONNECTION_AGE_MS);
+ bool enable = false;
+ if (a != NULL && a->type == GRPC_ARG_INTEGER && a->value.integer != INT_MAX) {
+ enable = true;
+ }
+ a = grpc_channel_args_find(channel_args, GRPC_ARG_MAX_CONNECTION_IDLE_MS);
+ if (a != NULL && a->type == GRPC_ARG_INTEGER && a->value.integer != INT_MAX) {
+ enable = true;
+ }
+ if (enable) {
+ return grpc_channel_stack_builder_prepend_filter(
+ builder, &grpc_max_age_filter, NULL, NULL);
+ } else {
+ return true;
+ }
+}
+
+void grpc_max_age_filter_init(void) {
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_max_age_filter, NULL);
+}
+
+void grpc_max_age_filter_shutdown(void) {}
diff --git a/src/core/ext/filters/max_age/max_age_filter.h b/src/core/ext/filters/max_age/max_age_filter.h
new file mode 100644
index 0000000000..ed2015297a
--- /dev/null
+++ b/src/core/ext/filters/max_age/max_age_filter.h
@@ -0,0 +1,39 @@
+//
+// Copyright 2017, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#ifndef GRPC_CORE_EXT_FILTERS_MAX_AGE_MAX_AGE_FILTER_H
+#define GRPC_CORE_EXT_FILTERS_MAX_AGE_MAX_AGE_FILTER_H
+
+#include "src/core/lib/channel/channel_stack.h"
+
+extern const grpc_channel_filter grpc_max_age_filter;
+
+#endif /* GRPC_CORE_EXT_FILTERS_MAX_AGE_MAX_AGE_FILTER_H */
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c
index 2b226c1bf7..e645eda7e3 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.c
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c
@@ -41,9 +41,9 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/client_channel/connector.h"
-#include "src/core/ext/client_channel/http_connect_handshaker.h"
-#include "src/core/ext/client_channel/subchannel.h"
+#include "src/core/ext/filters/client_channel/connector.h"
+#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.h b/src/core/ext/transport/chttp2/client/chttp2_connector.h
index f5d1025432..d55f6ed669 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.h
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.h
@@ -34,7 +34,7 @@
#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H
#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H
-#include "src/core/ext/client_channel/connector.h"
+#include "src/core/ext/filters/client_channel/connector.h"
grpc_connector* grpc_chttp2_connector_create();
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index 067ac35a5a..9c8505ddfa 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -38,8 +38,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/client_channel/client_channel.h"
-#include "src/core/ext/client_channel/resolver_registry.h"
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/surface/api_trace.h"
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index f0c241d68e..119adfade1 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -38,9 +38,9 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/client_channel/client_channel.h"
-#include "src/core/ext/client_channel/resolver_registry.h"
-#include "src/core/ext/client_channel/uri_parser.h"
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
+#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 301140311a..965306cbbe 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -69,10 +69,16 @@
#define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
#define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024)
-#define DEFAULT_KEEPALIVE_TIME_SECOND INT_MAX
-#define DEFAULT_KEEPALIVE_TIMEOUT_SECOND 20
+#define DEFAULT_CLIENT_KEEPALIVE_TIME_S INT_MAX
+#define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_S 20
#define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false
+static int g_default_client_keepalive_time_s = DEFAULT_CLIENT_KEEPALIVE_TIME_S;
+static int g_default_client_keepalive_timeout_s =
+ DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_S;
+static bool g_default_keepalive_permit_without_calls =
+ DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
+
#define MAX_CLIENT_STREAM_ID 0x7fffffffu
int grpc_http_trace = 0;
int grpc_flowctl_trace = 0;
@@ -147,6 +153,8 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_ping_type ping_type,
grpc_closure *on_initiate,
grpc_closure *on_complete);
+static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error);
#define DEFAULT_MIN_TIME_BETWEEN_PINGS_MS 0
#define DEFAULT_MAX_PINGS_BETWEEN_DATA 3
@@ -279,6 +287,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->destructive_reclaimer_locked,
destructive_reclaimer_locked, t,
grpc_combiner_scheduler(t->combiner, false));
+ grpc_closure_init(&t->retry_initiate_ping_locked, retry_initiate_ping_locked,
+ t, grpc_combiner_scheduler(t->combiner, false));
grpc_closure_init(&t->start_bdp_ping_locked, start_bdp_ping_locked, t,
grpc_combiner_scheduler(t->combiner, false));
grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
@@ -357,15 +367,16 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
/* client-side keepalive setting */
t->keepalive_time =
- DEFAULT_KEEPALIVE_TIME_SECOND == INT_MAX
+ g_default_client_keepalive_time_s == INT_MAX
? gpr_inf_future(GPR_TIMESPAN)
- : gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIME_SECOND, GPR_TIMESPAN);
+ : gpr_time_from_seconds(g_default_client_keepalive_time_s,
+ GPR_TIMESPAN);
t->keepalive_timeout =
- DEFAULT_KEEPALIVE_TIMEOUT_SECOND == INT_MAX
+ g_default_client_keepalive_timeout_s == INT_MAX
? gpr_inf_future(GPR_TIMESPAN)
- : gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIMEOUT_SECOND,
+ : gpr_time_from_seconds(g_default_client_keepalive_timeout_s,
GPR_TIMESPAN);
- t->keepalive_permit_without_calls = DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
+ t->keepalive_permit_without_calls = g_default_keepalive_permit_without_calls;
if (channel_args) {
for (i = 0; i < channel_args->num_args; i++) {
@@ -415,24 +426,25 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->enable_bdp_probe = grpc_channel_arg_get_integer(
&channel_args->args[i], (grpc_integer_options){1, 0, 1});
} else if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_HTTP2_KEEPALIVE_TIME)) {
+ GRPC_ARG_CLIENT_KEEPALIVE_TIME_S)) {
const int value = grpc_channel_arg_get_integer(
&channel_args->args[i],
- (grpc_integer_options){DEFAULT_KEEPALIVE_TIME_SECOND, 1, INT_MAX});
+ (grpc_integer_options){g_default_client_keepalive_time_s, 1,
+ INT_MAX});
t->keepalive_time = value == INT_MAX
? gpr_inf_future(GPR_TIMESPAN)
: gpr_time_from_seconds(value, GPR_TIMESPAN);
} else if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_HTTP2_KEEPALIVE_TIMEOUT)) {
+ GRPC_ARG_CLIENT_KEEPALIVE_TIMEOUT_S)) {
const int value = grpc_channel_arg_get_integer(
&channel_args->args[i],
- (grpc_integer_options){DEFAULT_KEEPALIVE_TIMEOUT_SECOND, 0,
+ (grpc_integer_options){g_default_client_keepalive_timeout_s, 0,
INT_MAX});
t->keepalive_timeout = value == INT_MAX
? gpr_inf_future(GPR_TIMESPAN)
: gpr_time_from_seconds(value, GPR_TIMESPAN);
} else if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_HTTP2_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
+ GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
t->keepalive_permit_without_calls =
(uint32_t)grpc_channel_arg_get_integer(
&channel_args->args[i], (grpc_integer_options){0, 0, 1});
@@ -486,6 +498,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
+ t->ping_state.is_delayed_ping_timer_set = false;
/** Start client-side keepalive pings */
if (t->is_client) {
@@ -1150,20 +1163,23 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
- grpc_transport_stream_op *op = stream_op;
- grpc_chttp2_transport *t = op->handler_private.args[0];
- grpc_chttp2_stream *s = op->handler_private.args[1];
+ grpc_transport_stream_op_batch *op = stream_op;
+ grpc_chttp2_stream *s = op->handler_private.extra_arg;
+ grpc_transport_stream_op_batch_payload *op_payload = op->payload;
+ grpc_chttp2_transport *t = s->t;
if (grpc_http_trace) {
- char *str = grpc_transport_stream_op_string(op);
+ char *str = grpc_transport_stream_op_batch_string(op);
gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str,
op->on_complete);
gpr_free(str);
if (op->send_initial_metadata) {
- log_metadata(op->send_initial_metadata, s->id, t->is_client, true);
+ log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
+ s->id, t->is_client, true);
}
if (op->send_trailing_metadata) {
- log_metadata(op->send_trailing_metadata, s->id, t->is_client, false);
+ log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
+ s->id, t->is_client, false);
}
}
@@ -1178,23 +1194,25 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
on_complete->error_data.error = GRPC_ERROR_NONE;
- if (op->collect_stats != NULL) {
+ if (op->collect_stats) {
GPR_ASSERT(s->collecting_stats == NULL);
- s->collecting_stats = op->collect_stats;
+ s->collecting_stats = op_payload->collect_stats.collect_stats;
on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT;
}
- if (op->cancel_error != GRPC_ERROR_NONE) {
- grpc_chttp2_cancel_stream(exec_ctx, t, s, op->cancel_error);
+ if (op->cancel_stream) {
+ grpc_chttp2_cancel_stream(exec_ctx, t, s,
+ op_payload->cancel_stream.cancel_error);
}
- if (op->send_initial_metadata != NULL) {
+ if (op->send_initial_metadata) {
GPR_ASSERT(s->send_initial_metadata_finished == NULL);
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->send_initial_metadata_finished = add_closure_barrier(on_complete);
- s->send_initial_metadata = op->send_initial_metadata;
+ s->send_initial_metadata =
+ op_payload->send_initial_metadata.send_initial_metadata;
const size_t metadata_size =
- grpc_metadata_batch_size(op->send_initial_metadata);
+ grpc_metadata_batch_size(s->send_initial_metadata);
const size_t metadata_peer_limit =
t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
@@ -1215,7 +1233,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
GRPC_ERROR_INT_LIMIT, (intptr_t)metadata_peer_limit),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
} else {
- if (contains_non_ok_status(op->send_initial_metadata)) {
+ if (contains_non_ok_status(s->send_initial_metadata)) {
s->seen_error = true;
}
if (!s->write_closed) {
@@ -1233,8 +1251,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
} else {
GPR_ASSERT(s->id != 0);
- grpc_chttp2_become_writable(exec_ctx, t, s,
- GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
+ grpc_chttp2_stream_write_type write_type =
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED;
+ if (op->send_message &&
+ (op->payload->send_message.send_message->flags &
+ GRPC_WRITE_BUFFER_HINT)) {
+ write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK;
+ }
+ grpc_chttp2_become_writable(exec_ctx, t, s, write_type,
"op.send_initial_metadata");
}
} else {
@@ -1249,7 +1273,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
}
- if (op->send_message != NULL) {
+ if (op->send_message) {
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
if (s->write_closed) {
@@ -1263,14 +1287,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
GPR_ASSERT(s->fetching_send_message == NULL);
uint8_t *frame_hdr =
grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5);
- uint32_t flags = op->send_message->flags;
+ uint32_t flags = op_payload->send_message.send_message->flags;
frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
- size_t len = op->send_message->length;
+ size_t len = op_payload->send_message.send_message->length;
frame_hdr[1] = (uint8_t)(len >> 24);
frame_hdr[2] = (uint8_t)(len >> 16);
frame_hdr[3] = (uint8_t)(len >> 8);
frame_hdr[4] = (uint8_t)(len);
- s->fetching_send_message = op->send_message;
+ s->fetching_send_message = op_payload->send_message.send_message;
s->fetched_send_message_length = 0;
s->next_message_end_offset = s->flow_controlled_bytes_written +
(int64_t)s->flow_controlled_buffer.length +
@@ -1287,14 +1311,15 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
}
- if (op->send_trailing_metadata != NULL) {
+ if (op->send_trailing_metadata) {
GPR_ASSERT(s->send_trailing_metadata_finished == NULL);
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
- s->send_trailing_metadata = op->send_trailing_metadata;
+ s->send_trailing_metadata =
+ op_payload->send_trailing_metadata.send_trailing_metadata;
s->write_buffering = false;
const size_t metadata_size =
- grpc_metadata_batch_size(op->send_trailing_metadata);
+ grpc_metadata_batch_size(s->send_trailing_metadata);
const size_t metadata_peer_limit =
t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
@@ -1311,14 +1336,15 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
GRPC_ERROR_INT_LIMIT, (intptr_t)metadata_peer_limit),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
} else {
- if (contains_non_ok_status(op->send_trailing_metadata)) {
+ if (contains_non_ok_status(s->send_trailing_metadata)) {
s->seen_error = true;
}
if (s->write_closed) {
s->send_trailing_metadata = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->send_trailing_metadata_finished,
- grpc_metadata_batch_is_empty(op->send_trailing_metadata)
+ grpc_metadata_batch_is_empty(
+ op->payload->send_trailing_metadata.send_trailing_metadata)
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Attempt to send trailing metadata after "
@@ -1334,27 +1360,30 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
}
- if (op->recv_initial_metadata != NULL) {
+ if (op->recv_initial_metadata) {
GPR_ASSERT(s->recv_initial_metadata_ready == NULL);
- s->recv_initial_metadata_ready = op->recv_initial_metadata_ready;
- s->recv_initial_metadata = op->recv_initial_metadata;
+ s->recv_initial_metadata_ready =
+ op_payload->recv_initial_metadata.recv_initial_metadata_ready;
+ s->recv_initial_metadata =
+ op_payload->recv_initial_metadata.recv_initial_metadata;
grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
}
- if (op->recv_message != NULL) {
+ if (op->recv_message) {
GPR_ASSERT(s->recv_message_ready == NULL);
- s->recv_message_ready = op->recv_message_ready;
- s->recv_message = op->recv_message;
+ s->recv_message_ready = op_payload->recv_message.recv_message_ready;
+ s->recv_message = op_payload->recv_message.recv_message;
if (s->id != 0 && s->frame_storage.length == 0) {
incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0);
}
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
- if (op->recv_trailing_metadata != NULL) {
+ if (op->recv_trailing_metadata) {
GPR_ASSERT(s->recv_trailing_metadata_finished == NULL);
s->recv_trailing_metadata_finished = add_closure_barrier(on_complete);
- s->recv_trailing_metadata = op->recv_trailing_metadata;
+ s->recv_trailing_metadata =
+ op_payload->recv_trailing_metadata.recv_trailing_metadata;
s->final_metadata_requested = true;
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
@@ -1367,19 +1396,19 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_transport_stream_op *op) {
+ grpc_stream *gs,
+ grpc_transport_stream_op_batch *op) {
GPR_TIMER_BEGIN("perform_stream_op", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
if (grpc_http_trace) {
- char *str = grpc_transport_stream_op_string(op);
+ char *str = grpc_transport_stream_op_batch_string(op);
gpr_log(GPR_DEBUG, "perform_stream_op[s=%p/%d]: %s", s, s->id, str);
gpr_free(str);
}
- op->handler_private.args[0] = gt;
- op->handler_private.args[1] = gs;
+ op->handler_private.extra_arg = gs;
GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
grpc_closure_sched(
exec_ctx,
@@ -1416,6 +1445,13 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
+static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
+ t->ping_state.is_delayed_ping_timer_set = false;
+ grpc_chttp2_initiate_write(exec_ctx, t, false, "retry_send_ping");
+}
+
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
uint64_t id) {
grpc_chttp2_ping_queue *pq =
@@ -1449,7 +1485,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
void *stream_op,
grpc_error *error_ignored) {
grpc_transport_op *op = stream_op;
- grpc_chttp2_transport *t = op->transport_private.args[0];
+ grpc_chttp2_transport *t = op->handler_private.extra_arg;
grpc_error *close_transport = op->disconnect_with_error;
if (op->on_connectivity_state_change != NULL) {
@@ -1495,10 +1531,10 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
char *msg = grpc_transport_op_string(op);
gpr_free(msg);
- op->transport_private.args[0] = gt;
+ op->handler_private.extra_arg = gt;
GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
grpc_closure_sched(
- exec_ctx, grpc_closure_init(&op->transport_private.closure,
+ exec_ctx, grpc_closure_init(&op->handler_private.closure,
perform_transport_op_locked, op,
grpc_combiner_scheduler(t->combiner, false)),
GRPC_ERROR_NONE);
@@ -1880,7 +1916,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
size_t msg_len = GRPC_SLICE_LENGTH(slice);
GPR_ASSERT(msg_len <= UINT32_MAX);
- uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 0);
+ uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 1);
message_pfx = grpc_slice_malloc(14 + msg_len_len);
p = GRPC_SLICE_START_PTR(message_pfx);
*p++ = 0x00; /* literal header, not indexed */
@@ -1897,7 +1933,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
*p++ = 'a';
*p++ = 'g';
*p++ = 'e';
- GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 0, 0, p, (uint32_t)msg_len_len);
+ GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 1, 0, p, (uint32_t)msg_len_len);
p += msg_len_len;
GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
len += (uint32_t)GRPC_SLICE_LENGTH(message_pfx);
@@ -2148,6 +2184,32 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
}
+void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args) {
+ size_t i;
+ if (args) {
+ for (i = 0; i < args->num_args; i++) {
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_CLIENT_KEEPALIVE_TIME_S)) {
+ g_default_client_keepalive_time_s = grpc_channel_arg_get_integer(
+ &args->args[i], (grpc_integer_options){
+ g_default_client_keepalive_time_s, 1, INT_MAX});
+ } else if (0 == strcmp(args->args[i].key,
+ GRPC_ARG_CLIENT_KEEPALIVE_TIMEOUT_S)) {
+ g_default_client_keepalive_timeout_s = grpc_channel_arg_get_integer(
+ &args->args[i],
+ (grpc_integer_options){g_default_client_keepalive_timeout_s, 0,
+ INT_MAX});
+ } else if (0 == strcmp(args->args[i].key,
+ GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
+ g_default_keepalive_permit_without_calls =
+ (uint32_t)grpc_channel_arg_get_integer(
+ &args->args[i],
+ (grpc_integer_options){g_default_keepalive_permit_without_calls,
+ 0, 1});
+ }
+ }
+ }
+}
+
static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
@@ -2191,9 +2253,7 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_timer_init(
exec_ctx, &t->keepalive_ping_timer,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
- grpc_closure_create(init_keepalive_ping_locked, t,
- grpc_combiner_scheduler(t->combiner, false)),
- gpr_now(GPR_CLOCK_MONOTONIC));
+ &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC));
}
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping end");
diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c
index de8462a17e..46dafdb62f 100644
--- a/src/core/ext/transport/chttp2/transport/frame_ping.c
+++ b/src/core/ext/transport/chttp2/transport/frame_ping.c
@@ -40,6 +40,8 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+static bool g_disable_ping_ack = false;
+
grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint64_t opaque_8bytes) {
grpc_slice slice = grpc_slice_malloc(9 + 8);
uint8_t *p = GRPC_SLICE_START_PTR(slice);
@@ -101,15 +103,21 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
if (p->is_ack) {
grpc_chttp2_ack_ping(exec_ctx, t, p->opaque_8bytes);
} else {
- if (t->ping_ack_count == t->ping_ack_capacity) {
- t->ping_ack_capacity = GPR_MAX(t->ping_ack_capacity * 3 / 2, 3);
- t->ping_acks = gpr_realloc(
- t->ping_acks, t->ping_ack_capacity * sizeof(*t->ping_acks));
+ if (!g_disable_ping_ack) {
+ if (t->ping_ack_count == t->ping_ack_capacity) {
+ t->ping_ack_capacity = GPR_MAX(t->ping_ack_capacity * 3 / 2, 3);
+ t->ping_acks = gpr_realloc(
+ t->ping_acks, t->ping_ack_capacity * sizeof(*t->ping_acks));
+ }
+ t->ping_acks[t->ping_ack_count++] = p->opaque_8bytes;
+ grpc_chttp2_initiate_write(exec_ctx, t, false, "ping response");
}
- t->ping_acks[t->ping_ack_count++] = p->opaque_8bytes;
- grpc_chttp2_initiate_write(exec_ctx, t, false, "ping response");
}
}
return GRPC_ERROR_NONE;
}
+
+void grpc_set_disable_ping_ack(bool disable_ping_ack) {
+ g_disable_ping_ack = disable_ping_ack;
+}
diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.h b/src/core/ext/transport/chttp2/transport/frame_ping.h
index ef642465d7..01983d2b12 100644
--- a/src/core/ext/transport/chttp2/transport/frame_ping.h
+++ b/src/core/ext/transport/chttp2/transport/frame_ping.h
@@ -53,4 +53,7 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_stream *s,
grpc_slice slice, int is_last);
+/* Test-only function for disabling ping ack */
+void grpc_set_disable_ping_ack(bool disable_ping_ack);
+
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_PING_H */
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index b4938dc281..0fef7a574a 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -102,6 +102,8 @@ typedef struct {
typedef struct {
gpr_timespec last_ping_sent_time;
int pings_before_data_required;
+ grpc_timer delayed_ping_timer;
+ bool is_delayed_ping_timer_set;
} grpc_chttp2_repeated_ping_state;
/* deframer state for the overall http2 stream of bytes */
@@ -306,6 +308,7 @@ struct grpc_chttp2_transport {
grpc_chttp2_repeated_ping_policy ping_policy;
grpc_chttp2_repeated_ping_state ping_state;
uint64_t ping_ctr; /* unique id for pings */
+ grpc_closure retry_initiate_ping_locked;
/** ping acks */
size_t ping_ack_count;
@@ -447,7 +450,6 @@ struct grpc_chttp2_stream {
int64_t next_message_end_offset;
int64_t flow_controlled_bytes_written;
bool complete_fetch_covered_by_poller;
- grpc_closure complete_fetch;
grpc_closure complete_fetch_locked;
grpc_closure *fetching_send_message_finished;
@@ -832,4 +834,8 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t);
+/** Set the default keepalive configurations, must only be called at
+ initialization */
+void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args);
+
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 2b9d93cae7..0869056f56 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -101,6 +101,14 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx,
"Ping delayed [%p]: not enough time elapsed since last ping",
t->peer_string);
}
+ if (!t->ping_state.is_delayed_ping_timer_set) {
+ t->ping_state.is_delayed_ping_timer_set = true;
+ grpc_timer_init(exec_ctx, &t->ping_state.delayed_ping_timer,
+ gpr_time_add(t->ping_state.last_ping_sent_time,
+ t->ping_policy.min_time_between_pings),
+ &t->retry_initiate_ping_locked,
+ gpr_now(GPR_CLOCK_MONOTONIC));
+ }
return;
}
/* coalesce equivalent pings into this one */
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index c732207216..c515cba2ca 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -172,7 +172,7 @@ struct op_state {
};
struct op_and_state {
- grpc_transport_stream_op op;
+ grpc_transport_stream_op_batch op;
struct op_state state;
bool done;
struct stream_obj *s; /* Pointer back to the stream object */
@@ -187,7 +187,7 @@ struct op_storage {
struct stream_obj {
gpr_arena *arena;
struct op_and_state *oas;
- grpc_transport_stream_op *curr_op;
+ grpc_transport_stream_op_batch *curr_op;
grpc_cronet_transport *curr_ct;
grpc_stream *curr_gs;
bidirectional_stream *cbs;
@@ -298,12 +298,13 @@ static grpc_error *make_error_with_desc(int error_code, const char *desc) {
/*
Add a new stream op to op storage.
*/
-static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) {
+static void add_to_storage(struct stream_obj *s,
+ grpc_transport_stream_op_batch *op) {
struct op_storage *storage = &s->storage;
/* add new op at the beginning of the linked list. The memory is freed
in remove_from_storage */
struct op_and_state *new_op = gpr_malloc(sizeof(struct op_and_state));
- memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op));
+ memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op_batch));
memset(&new_op->state, 0, sizeof(new_op->state));
new_op->s = s;
new_op->done = false;
@@ -768,7 +769,7 @@ static bool header_has_authority(grpc_linked_mdelem *head) {
Op Execution: Decide if one of the actions contained in the stream op can be
executed. This is the heart of the state machine.
*/
-static bool op_can_be_run(grpc_transport_stream_op *curr_op,
+static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op,
struct stream_obj *s, struct op_state *op_state,
enum e_op_id op_id) {
struct op_state *stream_state = &s->state;
@@ -919,7 +920,7 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op,
*/
static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
struct op_and_state *oas) {
- grpc_transport_stream_op *stream_op = &oas->op;
+ grpc_transport_stream_op_batch *stream_op = &oas->op;
struct stream_obj *s = oas->s;
grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct;
struct op_state *stream_state = &s->state;
@@ -941,9 +942,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
char *url = NULL;
const char *method = "POST";
s->header_array.headers = NULL;
- convert_metadata_to_cronet_headers(
- stream_op->send_initial_metadata->list.head, t->host, &url,
- &s->header_array.headers, &s->header_array.count, &method);
+ convert_metadata_to_cronet_headers(stream_op->payload->send_initial_metadata
+ .send_initial_metadata->list.head,
+ t->host, &url, &s->header_array.headers,
+ &s->header_array.count, &method);
s->header_array.capacity = s->header_array.count;
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url);
bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false);
@@ -971,13 +973,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer write_slice_buffer;
grpc_slice slice;
grpc_slice_buffer_init(&write_slice_buffer);
- if (1 != grpc_byte_stream_next(exec_ctx, stream_op->send_message,
- stream_op->send_message->length, NULL)) {
+ if (1 != grpc_byte_stream_next(exec_ctx, stream_op->payload->send_message.send_message,
+ stream_op->payload->send_message.send_message->length, NULL)) {
/* Should never reach here */
GPR_ASSERT(false);
}
if (GRPC_ERROR_NONE !=
- grpc_byte_stream_pull(exec_ctx, stream_op->send_message, &slice)) {
+ grpc_byte_stream_pull(exec_ctx, stream_op->payload->send_message.send_message, &slice)) {
/* Should never reach here */
GPR_ASSERT(false);
}
@@ -990,7 +992,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
if (write_slice_buffer.count > 0) {
size_t write_buffer_size;
create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
- &write_buffer_size, stream_op->send_message->flags);
+ &write_buffer_size,
+ stream_op->payload->send_message.send_message->flags);
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
stream_state->ws.write_buffer);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
@@ -1037,20 +1040,28 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
OP_RECV_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
- grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
- GRPC_ERROR_NONE);
+ grpc_closure_sched(
+ exec_ctx,
+ stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+ GRPC_ERROR_NONE);
} else if (stream_state->state_callback_received[OP_FAILED]) {
- grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
- GRPC_ERROR_NONE);
+ grpc_closure_sched(
+ exec_ctx,
+ stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+ GRPC_ERROR_NONE);
} else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
- grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
- GRPC_ERROR_NONE);
+ grpc_closure_sched(
+ exec_ctx,
+ stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+ GRPC_ERROR_NONE);
} else {
grpc_chttp2_incoming_metadata_buffer_publish(
exec_ctx, &oas->s->state.rs.initial_metadata,
- stream_op->recv_initial_metadata);
- grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
- GRPC_ERROR_NONE);
+ stream_op->payload->recv_initial_metadata.recv_initial_metadata);
+ grpc_closure_sched(
+ exec_ctx,
+ stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+ GRPC_ERROR_NONE);
}
stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK;
@@ -1059,27 +1070,31 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
- grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ grpc_closure_sched(exec_ctx,
+ stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->state_callback_received[OP_FAILED]) {
CRONET_LOG(GPR_DEBUG, "Stream failed.");
- grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ grpc_closure_sched(exec_ctx,
+ stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->rs.read_stream_closed == true) {
/* No more data will be received */
CRONET_LOG(GPR_DEBUG, "read stream closed");
- grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ grpc_closure_sched(exec_ctx,
+ stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->flush_read) {
CRONET_LOG(GPR_DEBUG, "flush read");
- grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ grpc_closure_sched(exec_ctx,
+ stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
@@ -1118,8 +1133,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
}
*((grpc_byte_buffer **)stream_op->recv_message) =
(grpc_byte_buffer *)&stream_state->rs.sbs;
- grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
- GRPC_ERROR_NONE);
+ grpc_closure_sched(
+ exec_ctx, stream_op->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
@@ -1171,7 +1187,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
}
*((grpc_byte_buffer **)stream_op->recv_message) =
(grpc_byte_buffer *)&stream_state->rs.sbs;
- grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ grpc_closure_sched(exec_ctx,
+ stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
@@ -1195,12 +1212,12 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
if (oas->s->state.rs.trailing_metadata_valid) {
grpc_chttp2_incoming_metadata_buffer_publish(
exec_ctx, &oas->s->state.rs.trailing_metadata,
- stream_op->recv_trailing_metadata);
+ stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
stream_state->rs.trailing_metadata_valid = false;
}
stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK;
- } else if (stream_op->cancel_error &&
+ } else if (stream_op->cancel_stream &&
op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
@@ -1212,7 +1229,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
}
stream_state->state_op_done[OP_CANCEL_ERROR] = true;
if (!stream_state->cancel_error) {
- stream_state->cancel_error = GRPC_ERROR_REF(stream_op->cancel_error);
+ stream_state->cancel_error =
+ GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
}
} else if (stream_op->on_complete &&
op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
@@ -1291,18 +1309,22 @@ static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set) {}
static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_transport_stream_op *op) {
+ grpc_stream *gs,
+ grpc_transport_stream_op_batch *op) {
CRONET_LOG(GPR_DEBUG, "perform_stream_op");
if (op->send_initial_metadata &&
- header_has_authority(op->send_initial_metadata->list.head)) {
+ header_has_authority(op->payload->send_initial_metadata
+ .send_initial_metadata->list.head)) {
/* Cronet does not support :authority header field. We cancel the call when
this field is present in metadata */
- if (op->recv_initial_metadata_ready) {
- grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready,
- GRPC_ERROR_CANCELLED);
+ if (op->recv_initial_metadata) {
+ grpc_closure_sched(
+ exec_ctx,
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+ GRPC_ERROR_CANCELLED);
}
- if (op->recv_message_ready) {
- grpc_closure_sched(exec_ctx, op->recv_message_ready,
+ if (op->recv_message) {
+ grpc_closure_sched(exec_ctx, op->payload->recv_message.recv_message_ready,
GRPC_ERROR_CANCELLED);
}
grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED);