aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-05-03 12:56:40 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-05-03 12:56:40 -0700
commit7c6ba9bae47bdeb6f532b457d9848e7dc94c8e14 (patch)
treedd56d6b41d03feedf1f51420507b958af7473c1a /src/core
parent27f59afecb04b63b7c83a842d400efb1ad09049a (diff)
parent6bac7d3467c99dccc30e8447bc84237bb54b99fe (diff)
Merge github.com:grpc/grpc into error
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/census/grpc_filter.c4
-rw-r--r--src/core/ext/client_config/client_channel.c5
-rw-r--r--src/core/ext/client_config/subchannel.c8
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c736
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h74
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c21
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_lists.c8
-rw-r--r--src/core/lib/channel/channel_stack.c6
-rw-r--r--src/core/lib/channel/channel_stack.h11
-rw-r--r--src/core/lib/channel/compress_filter.c26
-rw-r--r--src/core/lib/channel/compress_filter.h2
-rw-r--r--src/core/lib/channel/connected_channel.c7
-rw-r--r--src/core/lib/channel/http_client_filter.c4
-rw-r--r--src/core/lib/channel/http_server_filter.c17
-rw-r--r--src/core/lib/http/parser.c35
-rw-r--r--src/core/lib/http/parser.h1
-rw-r--r--src/core/lib/iomgr/ev_posix.c1
-rw-r--r--src/core/lib/iomgr/iomgr.c6
-rw-r--r--src/core/lib/iomgr/resolve_address_windows.c6
-rw-r--r--src/core/lib/iomgr/tcp_client_windows.c29
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c28
-rw-r--r--src/core/lib/iomgr/tcp_windows.c16
-rw-r--r--src/core/lib/security/client_auth_filter.c4
-rw-r--r--src/core/lib/security/server_auth_filter.c4
-rw-r--r--src/core/lib/support/env_win32.c44
-rw-r--r--src/core/lib/support/log_linux.c4
-rw-r--r--src/core/lib/support/log_win32.c18
-rw-r--r--src/core/lib/support/string_util_win32.c94
-rw-r--r--src/core/lib/support/string_win32.c32
-rw-r--r--src/core/lib/support/time_win32.c4
-rw-r--r--src/core/lib/support/tmpfile_msys.c73
-rw-r--r--src/core/lib/support/tmpfile_posix.c4
-rw-r--r--src/core/lib/support/tmpfile_win32.c4
-rw-r--r--src/core/lib/surface/call.c42
-rw-r--r--src/core/lib/surface/completion_queue.c4
-rw-r--r--src/core/lib/surface/init.c3
-rw-r--r--src/core/lib/surface/lame_client.c6
-rw-r--r--src/core/lib/surface/server.c4
-rw-r--r--src/core/lib/transport/metadata.c29
-rw-r--r--src/core/lib/transport/transport.c5
-rw-r--r--src/core/lib/transport/transport.h12
-rw-r--r--src/core/lib/transport/transport_impl.h2
42 files changed, 969 insertions, 474 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index abfb3bb5f0..5e278ef127 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -134,7 +134,7 @@ static void client_init_call_elem(grpc_exec_ctx *exec_ctx,
}
static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
+ grpc_call_element *elem, void *ignored) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
/* TODO(hongyu): record rpc client stats and census_rpc_end_op here */
@@ -152,7 +152,7 @@ static void server_init_call_elem(grpc_exec_ctx *exec_ctx,
}
static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
+ grpc_call_element *elem, void *ignored) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
/* TODO(hongyu): record rpc server stats and census_tracing_end_op here */
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 8a98a6bcbe..9b5a078aec 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -415,9 +415,10 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *and_free_memory) {
grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data);
+ gpr_free(and_free_memory);
}
/* Constructor for channel_data */
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index c925c28c67..bd45d3825c 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -268,7 +268,7 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
if (con != NULL) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection");
- gpr_atm_no_barrier_store(&c->connected_subchannel, 0xdeadbeef);
+ gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef);
}
gpr_mu_unlock(&c->mu);
}
@@ -644,9 +644,9 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
bool success) {
grpc_subchannel_call *c = call;
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
- grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, c->connection, "subchannel_call");
- gpr_free(c);
+ grpc_connected_subchannel *connection = c->connection;
+ grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), c);
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call");
GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 01507f5ca6..8c8593748d 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -81,27 +81,25 @@ int grpc_flowctl_trace = 0;
static const grpc_transport_vtable vtable;
-static void lock(grpc_chttp2_transport *t);
-static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
-
/* forward declarations of various callbacks that we'll build closures around */
static void writing_action(grpc_exec_ctx *exec_ctx, void *t,
bool iomgr_success_ignored);
+static void reading_action(grpc_exec_ctx *exec_ctx, void *t,
+ bool iomgr_success_ignored);
+static void parsing_action(grpc_exec_ctx *exec_ctx, void *t,
+ bool iomgr_success_ignored);
/** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
uint32_t value);
-/** Endpoint callback to process incoming data */
-static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success);
-
/** Start disconnection chain */
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
/** Perform a transport_op */
-static void perform_stream_op_locked(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op);
+static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, void *transport_op);
/** Cancel a stream: coming from the transport API */
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
@@ -118,15 +116,19 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
/** Add endpoint from this transport to pollset */
static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
- grpc_pollset *pollset);
+ grpc_chttp2_stream *s_ignored, void *pollset);
static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
- grpc_pollset_set *pollset_set);
+ grpc_chttp2_stream *s_ignored,
+ void *pollset_set);
/** Start new streams that have been created if we can */
static void maybe_start_some_streams(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
+static void finish_global_actions(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t);
+
static void connectivity_state_set(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_connectivity_state state, const char *reason);
@@ -138,7 +140,10 @@ static void incoming_byte_stream_update_flow_control(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
size_t have_already);
-
+static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ void *byte_stream);
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global);
@@ -150,7 +155,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
size_t i;
- gpr_mu_lock(&t->mu);
+ gpr_mu_lock(&t->executor.mu);
GPR_ASSERT(t->ep == NULL);
@@ -176,8 +181,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_destroy(&t->new_stream_map);
grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
- gpr_mu_unlock(&t->mu);
- gpr_mu_destroy(&t->mu);
+ gpr_mu_unlock(&t->executor.mu);
+ gpr_mu_destroy(&t->executor.mu);
/* callback remaining pings: they're not allowed to call into the transpot,
and maybe they hold resources that need to be freed */
@@ -238,7 +243,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_ref_init(&t->refs, 2);
/* ref is dropped at transport close() */
gpr_ref_init(&t->shutdown_ep_refs, 1);
- gpr_mu_init(&t->mu);
+ gpr_mu_init(&t->executor.mu);
t->peer_string = grpc_endpoint_get_peer(ep);
t->endpoint_reading = 1;
t->global.next_stream_id = is_client ? 1 : 2;
@@ -262,6 +267,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_slice_buffer_init(&t->writing.outbuf);
grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor);
grpc_closure_init(&t->writing_action, writing_action, t);
+ grpc_closure_init(&t->reading_action, reading_action, t);
+ grpc_closure_init(&t->parsing_action, parsing_action, t);
gpr_slice_buffer_init(&t->parsing.qbuf);
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
@@ -269,7 +276,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
&t->writing);
- grpc_closure_init(&t->recv_data, recv_data, t);
gpr_slice_buffer_init(&t->read_buffer);
if (is_client) {
@@ -377,14 +383,18 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
-static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
- grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
-
- lock(t);
+static void destroy_transport_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_ignored,
+ void *arg_ignored) {
t->destroying = 1;
drop_connection(exec_ctx, t);
- unlock(exec_ctx, t);
+}
+static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+ grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, destroy_transport_locked,
+ NULL, 0);
UNREF_TRANSPORT(exec_ctx, t, "destroy");
}
@@ -404,17 +414,6 @@ static void allow_endpoint_shutdown_locked(grpc_exec_ctx *exec_ctx,
}
}
-static void allow_endpoint_shutdown_unlocked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t) {
- if (gpr_unref(&t->shutdown_ep_refs)) {
- gpr_mu_lock(&t->mu);
- if (t->ep) {
- grpc_endpoint_shutdown(exec_ctx, t->ep);
- }
- gpr_mu_unlock(&t->mu);
- }
-}
-
static void destroy_endpoint(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_endpoint_destroy(exec_ctx, t->ep);
@@ -424,7 +423,9 @@ static void destroy_endpoint(grpc_exec_ctx *exec_ctx,
}
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t) {
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_ignored,
+ void *arg_ignored) {
if (!t->closed) {
t->closed = 1;
connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_FATAL_FAILURE,
@@ -464,6 +465,13 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
}
#endif
+static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ void *arg_ignored) {
+ grpc_chttp2_register_stream(t, s);
+}
+
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
const void *server_data) {
@@ -473,6 +481,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
memset(s, 0, sizeof(*s));
s->refcount = refcount;
+ /* We reserve one 'active stream' that's dropped when the stream is
+ read-closed. The others are for incoming_byte_streams that are actively
+ reading */
+ gpr_ref_init(&s->global.active_streams, 1);
GRPC_CHTTP2_STREAM_REF(&s->global, "chttp2");
grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.metadata_buffer[0]);
@@ -486,10 +498,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
REF_TRANSPORT(t, "stream");
- lock(t);
- grpc_chttp2_register_stream(t, s);
if (server_data) {
- GPR_ASSERT(t->parsing_active);
+ GPR_ASSERT(t->executor.parsing_active);
s->global.id = (uint32_t)(uintptr_t)server_data;
s->parsing.id = s->global.id;
s->global.outgoing_window =
@@ -502,40 +512,42 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s);
s->global.in_stream_map = 1;
}
- unlock(exec_ctx, t);
+
+ grpc_chttp2_run_with_global_lock(exec_ctx, t, s, finish_init_stream_locked,
+ NULL, 0);
return 0;
}
-static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs) {
- grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
- int i;
+static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, void *arg) {
grpc_byte_stream *bs;
GPR_TIMER_BEGIN("destroy_stream", 0);
- gpr_mu_lock(&t->mu);
-
GPR_ASSERT((s->global.write_closed && s->global.read_closed) ||
s->global.id == 0);
GPR_ASSERT(!s->global.in_stream_map);
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
- close_transport_locked(exec_ctx, t);
+ close_transport_locked(exec_ctx, t, NULL, NULL);
}
- if (!t->parsing_active && s->global.id) {
+ if (!t->executor.parsing_active && s->global.id) {
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map,
s->global.id) == NULL);
}
+ while (
+ (bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) {
+ incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
+ }
+
grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global,
&s->global);
grpc_chttp2_list_remove_stalled_by_transport(&t->global, &s->global);
+ grpc_chttp2_list_remove_check_read_ops(&t->global, &s->global);
- gpr_mu_unlock(&t->mu);
-
- for (i = 0; i < STREAM_LIST_COUNT; i++) {
+ for (int i = 0; i < STREAM_LIST_COUNT; i++) {
if (s->included[i]) {
gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
t->global.is_client ? "client" : "server", s->global.id, i);
@@ -543,11 +555,6 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
}
}
- while (
- (bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) {
- grpc_byte_stream_destroy(exec_ctx, bs);
- }
-
GPR_ASSERT(s->global.send_initial_metadata_finished == NULL);
GPR_ASSERT(s->global.send_message_finished == NULL);
GPR_ASSERT(s->global.send_trailing_metadata_finished == NULL);
@@ -566,6 +573,17 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
UNREF_TRANSPORT(exec_ctx, t, "stream");
GPR_TIMER_END("destroy_stream", 0);
+
+ gpr_free(arg);
+}
+
+static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, void *and_free_memory) {
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+ grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
+
+ grpc_chttp2_run_with_global_lock(exec_ctx, t, s, destroy_stream_locked,
+ and_free_memory, 0);
}
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
@@ -594,28 +612,96 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
* LOCK MANAGEMENT
*/
-/* We take a grpc_chttp2_transport-global lock in response to calls coming in
- from above,
- and in response to data being received from below. New data to be written
- is always queued, as are callbacks to process data. During unlock() we
- check our todo lists and initiate callbacks and flush writes. */
-
-static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
-
-static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
- GPR_TIMER_BEGIN("unlock", 0);
- if (!t->writing_active && !t->closed &&
- grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing,
- t->parsing_active)) {
- t->writing_active = 1;
- REF_TRANSPORT(t, "writing");
- grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, true, NULL);
- prevent_endpoint_shutdown(t);
+static void finish_global_actions(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ grpc_chttp2_executor_action_header *hdr;
+ grpc_chttp2_executor_action_header *next;
+
+ for (;;) {
+ if (!t->executor.writing_active && !t->closed &&
+ grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing,
+ t->executor.parsing_active)) {
+ t->executor.writing_active = 1;
+ REF_TRANSPORT(t, "writing");
+ prevent_endpoint_shutdown(t);
+ grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, true, NULL);
+ }
+ check_read_ops(exec_ctx, &t->global);
+
+ gpr_mu_lock(&t->executor.mu);
+ if (t->executor.pending_actions_head != NULL) {
+ hdr = t->executor.pending_actions_head;
+ t->executor.pending_actions_head = t->executor.pending_actions_tail =
+ NULL;
+ gpr_mu_unlock(&t->executor.mu);
+ while (hdr != NULL) {
+ hdr->action(exec_ctx, t, hdr->stream, hdr->arg);
+ next = hdr->next;
+ gpr_free(hdr);
+ UNREF_TRANSPORT(exec_ctx, t, "pending_action");
+ hdr = next;
+ }
+ continue;
+ } else {
+ t->executor.global_active = false;
+ }
+ gpr_mu_unlock(&t->executor.mu);
+ break;
+ }
+}
+
+void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *optional_stream,
+ grpc_chttp2_locked_action action,
+ void *arg, size_t sizeof_arg) {
+ grpc_chttp2_executor_action_header *hdr;
+
+ REF_TRANSPORT(t, "run_global");
+ gpr_mu_lock(&t->executor.mu);
+
+ for (;;) {
+ if (!t->executor.global_active) {
+ t->executor.global_active = 1;
+ gpr_mu_unlock(&t->executor.mu);
+
+ action(exec_ctx, t, optional_stream, arg);
+
+ finish_global_actions(exec_ctx, t);
+ } else {
+ gpr_mu_unlock(&t->executor.mu);
+
+ hdr = gpr_malloc(sizeof(*hdr) + sizeof_arg);
+ hdr->stream = optional_stream;
+ hdr->action = action;
+ if (sizeof_arg == 0) {
+ hdr->arg = arg;
+ } else {
+ hdr->arg = hdr + 1;
+ memcpy(hdr->arg, arg, sizeof_arg);
+ }
+
+ gpr_mu_lock(&t->executor.mu);
+ if (!t->executor.global_active) {
+ /* global lock was released while allocating memory: release & retry */
+ gpr_free(hdr);
+ continue;
+ }
+ hdr->next = NULL;
+ if (t->executor.pending_actions_head != NULL) {
+ t->executor.pending_actions_tail =
+ t->executor.pending_actions_tail->next = hdr;
+ } else {
+ t->executor.pending_actions_tail = t->executor.pending_actions_head =
+ hdr;
+ }
+ REF_TRANSPORT(t, "pending_action");
+ gpr_mu_unlock(&t->executor.mu);
+ }
+ break;
}
- check_read_ops(exec_ctx, &t->global);
- gpr_mu_unlock(&t->mu);
- GPR_TIMER_END("unlock", 0);
+ UNREF_TRANSPORT(exec_ctx, t, "run_global");
}
/*******************************************************************************
@@ -645,15 +731,11 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
}
}
-void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
- void *transport_writing_ptr, bool success) {
- grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr;
- grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
- grpc_chttp2_stream_global *stream_global;
-
- GPR_TIMER_BEGIN("grpc_chttp2_terminate_writing", 0);
-
- lock(t);
+static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_ignored,
+ void *a) {
+ bool success = (bool)(uintptr_t)a;
allow_endpoint_shutdown_locked(exec_ctx, t);
@@ -663,24 +745,30 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
+ grpc_chttp2_stream_global *stream_global;
while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global,
&stream_global)) {
fail_pending_writes(exec_ctx, stream_global);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes");
}
- /* leave the writing flag up on shutdown to prevent further writes in unlock()
+ /* leave the writing flag up on shutdown to prevent further writes in
+ unlock()
from starting */
- t->writing_active = 0;
+ t->executor.writing_active = 0;
if (t->ep && !t->endpoint_reading) {
destroy_endpoint(exec_ctx, t);
}
- unlock(exec_ctx, t);
-
UNREF_TRANSPORT(exec_ctx, t, "writing");
+}
- GPR_TIMER_END("grpc_chttp2_terminate_writing", 0);
+void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
+ void *transport_writing, bool success) {
+ grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
+ grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL,
+ terminate_writing_with_lock,
+ (void *)(uintptr_t)success, 0);
}
static void writing_action(grpc_exec_ctx *exec_ctx, void *gt,
@@ -806,14 +894,16 @@ static int contains_non_ok_status(
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, bool success) {}
-static void perform_stream_op_locked(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) {
- grpc_closure *on_complete;
-
+static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, void *stream_op) {
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
- on_complete = op->on_complete;
+ grpc_transport_stream_op *op = stream_op;
+ grpc_chttp2_transport_global *transport_global = &t->global;
+ grpc_chttp2_stream_global *stream_global = &s->global;
+
+ grpc_closure *on_complete = op->on_complete;
if (on_complete == NULL) {
on_complete = grpc_closure_create(do_nothing, NULL);
}
@@ -938,10 +1028,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_transport_stream_op *op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
-
- lock(t);
- perform_stream_op_locked(exec_ctx, &t->global, &s->global, op);
- unlock(exec_ctx, t);
+ grpc_chttp2_run_with_global_lock(exec_ctx, t, s, perform_stream_op_locked, op,
+ sizeof(*op));
}
static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
@@ -961,13 +1049,10 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id));
}
-void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport_parsing *transport_parsing,
- const uint8_t *opaque_8bytes) {
+static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, void *opaque_8bytes) {
grpc_chttp2_outstanding_ping *ping;
- grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
grpc_chttp2_transport_global *transport_global = &t->global;
- lock(t);
for (ping = transport_global->pings.next; ping != &transport_global->pings;
ping = ping->next) {
if (0 == memcmp(opaque_8bytes, ping->id, 8)) {
@@ -978,13 +1063,31 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
break;
}
}
- unlock(exec_ctx, t);
+}
+
+void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_parsing *transport_parsing,
+ const uint8_t *opaque_8bytes) {
+ grpc_chttp2_run_with_global_lock(
+ exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing), NULL,
+ ack_ping_locked, (void *)opaque_8bytes, 8);
}
static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
- grpc_transport_op *op) {
- bool close_transport = false;
+ grpc_chttp2_stream *s_unused,
+ void *stream_op) {
+ grpc_transport_op *op = stream_op;
+ bool close_transport = op->disconnect;
+
+ /* If there's a set_accept_stream ensure that we're not parsing
+ to avoid changing things out from underneath */
+ if (t->executor.parsing_active && op->set_accept_stream) {
+ GPR_ASSERT(t->post_parsing_op == NULL);
+ t->post_parsing_op = gpr_malloc(sizeof(*op));
+ memcpy(t->post_parsing_op, op, sizeof(*op));
+ return;
+ }
grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
@@ -1010,47 +1113,31 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
}
if (op->bind_pollset) {
- add_to_pollset_locked(exec_ctx, t, op->bind_pollset);
+ add_to_pollset_locked(exec_ctx, t, NULL, op->bind_pollset);
}
if (op->bind_pollset_set) {
- add_to_pollset_set_locked(exec_ctx, t, op->bind_pollset_set);
+ add_to_pollset_set_locked(exec_ctx, t, NULL, op->bind_pollset_set);
}
if (op->send_ping) {
send_ping_locked(t, op->send_ping);
}
- if (op->disconnect) {
- close_transport_locked(exec_ctx, t);
- }
-
if (close_transport) {
- close_transport_locked(exec_ctx, t);
+ close_transport_locked(exec_ctx, t, NULL, NULL);
}
}
static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_transport_op *op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
-
- lock(t);
-
- /* If there's a set_accept_stream ensure that we're not parsing
- to avoid changing things out from underneath */
- if (t->parsing_active && op->set_accept_stream) {
- GPR_ASSERT(t->post_parsing_op == NULL);
- t->post_parsing_op = gpr_malloc(sizeof(*op));
- memcpy(t->post_parsing_op, op, sizeof(*op));
- } else {
- perform_transport_op_locked(exec_ctx, t, op);
- }
-
- unlock(exec_ctx, t);
+ grpc_chttp2_run_with_global_lock(
+ exec_ctx, t, NULL, perform_transport_op_locked, op, sizeof(*op));
}
/*******************************************************************************
- * INPUT PROCESSING
+ * INPUT PROCESSING - GENERAL
*/
static void check_read_ops(grpc_exec_ctx *exec_ctx,
@@ -1072,7 +1159,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
while (stream_global->seen_error &&
(bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- grpc_byte_stream_destroy(exec_ctx, bs);
+ incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
}
if (stream_global->incoming_frames.head != NULL) {
*stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop(
@@ -1093,9 +1180,9 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
while (stream_global->seen_error &&
(bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- grpc_byte_stream_destroy(exec_ctx, bs);
+ incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
}
- if (stream_global->incoming_frames.head == NULL) {
+ if (stream_global->all_incoming_byte_streams_finished) {
grpc_chttp2_incoming_metadata_buffer_publish(
&stream_global->received_trailing_metadata,
stream_global->recv_trailing_metadata);
@@ -1107,6 +1194,15 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
}
}
+static void decrement_active_streams_locked(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global) {
+ if ((stream_global->all_incoming_byte_streams_finished =
+ gpr_unref(&stream_global->active_streams))) {
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ }
+}
+
static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
uint32_t id) {
size_t new_stream_count;
@@ -1128,7 +1224,7 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
- close_transport_locked(exec_ctx, t);
+ close_transport_locked(exec_ctx, t, NULL, NULL);
}
if (grpc_chttp2_list_remove_writable_stream(&t->global, &s->global)) {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "chttp2_writing");
@@ -1229,10 +1325,11 @@ void grpc_chttp2_mark_stream_closed(
stream_global->read_closed = 1;
stream_global->published_initial_metadata = 1;
stream_global->published_trailing_metadata = 1;
+ decrement_active_streams_locked(exec_ctx, transport_global, stream_global);
}
if (close_writes && !stream_global->write_closed) {
stream_global->write_closed = 1;
- if (TRANSPORT_FROM_GLOBAL(transport_global)->writing_active) {
+ if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.writing_active) {
GRPC_CHTTP2_STREAM_REF(stream_global, "finish_writes");
grpc_chttp2_list_add_closed_waiting_for_writing(transport_global,
stream_global);
@@ -1242,7 +1339,7 @@ void grpc_chttp2_mark_stream_closed(
}
if (stream_global->read_closed && stream_global->write_closed) {
if (stream_global->id != 0 &&
- TRANSPORT_FROM_GLOBAL(transport_global)->parsing_active) {
+ TRANSPORT_FROM_GLOBAL(transport_global)->executor.parsing_active) {
grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global,
stream_global);
} else {
@@ -1374,7 +1471,7 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx,
}
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
- close_transport_locked(exec_ctx, t);
+ close_transport_locked(exec_ctx, t, NULL, NULL);
end_all_the_calls(exec_ctx, t);
}
@@ -1398,102 +1495,136 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
}
}
-static void read_error_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t) {
- t->endpoint_reading = 0;
- if (!t->writing_active && t->ep) {
- destroy_endpoint(exec_ctx, t);
- }
-}
+/*******************************************************************************
+ * INPUT PROCESSING - PARSING
+ */
-/* tcp read callback */
-static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) {
- size_t i;
- int keep_reading = 0;
- grpc_chttp2_transport *t = tp;
+static void reading_action_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused, void *arg);
+static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success);
+static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused, void *arg);
+static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused, void *arg);
+
+static void reading_action(grpc_exec_ctx *exec_ctx, void *tp, bool success) {
+ /* Control flow:
+ reading_action_locked ->
+ (parse_unlocked -> post_parse_locked)? ->
+ post_reading_action_locked */
+ grpc_chttp2_run_with_global_lock(exec_ctx, tp, NULL, reading_action_locked,
+ (void *)(uintptr_t)success, 0);
+}
+
+static void reading_action_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused, void *arg) {
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
- grpc_chttp2_stream_global *stream_global;
+ bool success = (bool)(uintptr_t)arg;
- GPR_TIMER_BEGIN("recv_data", 0);
-
- lock(t);
- i = 0;
- GPR_ASSERT(!t->parsing_active);
+ GPR_ASSERT(!t->executor.parsing_active);
if (!t->closed) {
- t->parsing_active = 1;
+ t->executor.parsing_active = 1;
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map);
grpc_chttp2_prepare_to_read(transport_global, transport_parsing);
- gpr_mu_unlock(&t->mu);
- GPR_TIMER_BEGIN("recv_data.parse", 0);
- for (; i < t->read_buffer.count &&
- grpc_chttp2_perform_read(exec_ctx, transport_parsing,
- t->read_buffer.slices[i]);
- i++)
- ;
- GPR_TIMER_END("recv_data.parse", 0);
- gpr_mu_lock(&t->mu);
- /* copy parsing qbuf to global qbuf */
- gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf);
- if (i != t->read_buffer.count) {
- unlock(exec_ctx, t);
- lock(t);
- drop_connection(exec_ctx, t);
- }
- /* merge stream lists */
- grpc_chttp2_stream_map_move_into(&t->new_stream_map,
- &t->parsing_stream_map);
- transport_global->concurrent_stream_count =
- (uint32_t)grpc_chttp2_stream_map_size(&t->parsing_stream_map);
- if (transport_parsing->initial_window_update != 0) {
- grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
- update_global_window, t);
- transport_parsing->initial_window_update = 0;
- }
- /* handle higher level things */
- grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing);
- t->parsing_active = 0;
- /* handle delayed transport ops (if there is one) */
- if (t->post_parsing_op) {
- grpc_transport_op *op = t->post_parsing_op;
- t->post_parsing_op = NULL;
- perform_transport_op_locked(exec_ctx, t, op);
- gpr_free(op);
- }
- /* if a stream is in the stream map, and gets cancelled, we need to ensure
- * we are not parsing before continuing the cancellation to keep things in
- * a sane state */
- while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global,
- &stream_global)) {
- GPR_ASSERT(stream_global->in_stream_map);
- GPR_ASSERT(stream_global->write_closed);
- GPR_ASSERT(stream_global->read_closed);
- remove_stream(exec_ctx, t, stream_global->id);
- GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
- }
+ grpc_exec_ctx_enqueue(exec_ctx, &t->parsing_action, success, NULL);
+ } else {
+ post_reading_action_locked(exec_ctx, t, s_unused, arg);
+ }
+}
+
+static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+ grpc_chttp2_transport *t = arg;
+ GPR_TIMER_BEGIN("reading_action.parse", 0);
+ size_t i = 0;
+ for (; i < t->read_buffer.count &&
+ grpc_chttp2_perform_read(exec_ctx, &t->parsing,
+ t->read_buffer.slices[i]);
+ i++)
+ ;
+ if (i != t->read_buffer.count) {
+ success = false;
}
- if (!success || i != t->read_buffer.count || t->closed) {
+ GPR_TIMER_END("reading_action.parse", 0);
+ grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, post_parse_locked,
+ (void *)(uintptr_t)success, 0);
+}
+
+static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused, void *arg) {
+ grpc_chttp2_transport_global *transport_global = &t->global;
+ grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
+ /* copy parsing qbuf to global qbuf */
+ gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf);
+ /* merge stream lists */
+ grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map);
+ transport_global->concurrent_stream_count =
+ (uint32_t)grpc_chttp2_stream_map_size(&t->parsing_stream_map);
+ if (transport_parsing->initial_window_update != 0) {
+ grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
+ update_global_window, t);
+ transport_parsing->initial_window_update = 0;
+ }
+ /* handle higher level things */
+ grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing);
+ t->executor.parsing_active = 0;
+ /* handle delayed transport ops (if there is one) */
+ if (t->post_parsing_op) {
+ grpc_transport_op *op = t->post_parsing_op;
+ t->post_parsing_op = NULL;
+ perform_transport_op_locked(exec_ctx, t, NULL, op);
+ gpr_free(op);
+ }
+ /* if a stream is in the stream map, and gets cancelled, we need to
+ * ensure we are not parsing before continuing the cancellation to keep
+ * things in a sane state */
+ grpc_chttp2_stream_global *stream_global;
+ while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global,
+ &stream_global)) {
+ GPR_ASSERT(stream_global->in_stream_map);
+ GPR_ASSERT(stream_global->write_closed);
+ GPR_ASSERT(stream_global->read_closed);
+ remove_stream(exec_ctx, t, stream_global->id);
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
+ }
+
+ post_reading_action_locked(exec_ctx, t, s_unused, arg);
+}
+
+static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused,
+ void *arg) {
+ bool success = (bool)(uintptr_t)arg;
+ bool keep_reading = false;
+ if (!success || t->closed) {
drop_connection(exec_ctx, t);
- read_error_locked(exec_ctx, t);
+ t->endpoint_reading = 0;
+ if (!t->executor.writing_active && t->ep) {
+ grpc_endpoint_destroy(exec_ctx, t->ep);
+ t->ep = NULL;
+ /* safe as we still have a ref for read */
+ UNREF_TRANSPORT(exec_ctx, t, "disconnect");
+ }
} else if (!t->closed) {
- keep_reading = 1;
+ keep_reading = true;
REF_TRANSPORT(t, "keep_reading");
prevent_endpoint_shutdown(t);
}
gpr_slice_buffer_reset_and_unref(&t->read_buffer);
- unlock(exec_ctx, t);
if (keep_reading) {
- grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->recv_data);
- allow_endpoint_shutdown_unlocked(exec_ctx, t);
+ grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action);
+ allow_endpoint_shutdown_locked(exec_ctx, t);
UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else {
- UNREF_TRANSPORT(exec_ctx, t, "recv_data");
+ UNREF_TRANSPORT(exec_ctx, t, "reading_action");
}
-
- GPR_TIMER_END("recv_data", 0);
}
/*******************************************************************************
@@ -1517,7 +1648,7 @@ static void connectivity_state_set(
static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
- grpc_pollset *pollset) {
+ grpc_chttp2_stream *s_unused, void *pollset) {
if (t->ep) {
grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset);
}
@@ -1525,7 +1656,8 @@ static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
- grpc_pollset_set *pollset_set) {
+ grpc_chttp2_stream *s_unused,
+ void *pollset_set) {
if (t->ep) {
grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set);
}
@@ -1533,16 +1665,24 @@ static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_pollset *pollset) {
- grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- lock(t);
- add_to_pollset_locked(exec_ctx, t, pollset);
- unlock(exec_ctx, t);
+ /* TODO(ctiller): keep pollset alive */
+ grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt,
+ (grpc_chttp2_stream *)gs,
+ add_to_pollset_locked, pollset, 0);
}
/*******************************************************************************
* BYTE STREAM
*/
+static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_incoming_byte_stream *bs) {
+ if (gpr_unref(&bs->refs)) {
+ gpr_slice_buffer_destroy(&bs->slices);
+ gpr_free(bs);
+ }
+}
+
static void incoming_byte_stream_update_flow_control(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
@@ -1583,87 +1723,146 @@ static void incoming_byte_stream_update_flow_control(
}
}
-static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream,
- gpr_slice *slice, size_t max_size_hint,
- grpc_closure *on_complete) {
+typedef struct {
+ grpc_chttp2_incoming_byte_stream *byte_stream;
+ gpr_slice *slice;
+ size_t max_size_hint;
+ grpc_closure *on_complete;
+} incoming_byte_stream_next_arg;
+
+static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ void *argp) {
+ incoming_byte_stream_next_arg *arg = argp;
grpc_chttp2_incoming_byte_stream *bs =
- (grpc_chttp2_incoming_byte_stream *)byte_stream;
+ (grpc_chttp2_incoming_byte_stream *)arg->byte_stream;
grpc_chttp2_transport_global *transport_global = &bs->transport->global;
grpc_chttp2_stream_global *stream_global = &bs->stream->global;
- lock(bs->transport);
if (bs->is_tail) {
- incoming_byte_stream_update_flow_control(transport_global, stream_global,
- max_size_hint, bs->slices.length);
+ incoming_byte_stream_update_flow_control(
+ transport_global, stream_global, arg->max_size_hint, bs->slices.length);
}
if (bs->slices.count > 0) {
- *slice = gpr_slice_buffer_take_first(&bs->slices);
- unlock(exec_ctx, bs->transport);
- return 1;
+ *arg->slice = gpr_slice_buffer_take_first(&bs->slices);
+ grpc_exec_ctx_enqueue(exec_ctx, arg->on_complete, true, NULL);
} else if (bs->failed) {
- grpc_exec_ctx_enqueue(exec_ctx, on_complete, false, NULL);
- unlock(exec_ctx, bs->transport);
- return 0;
+ grpc_exec_ctx_enqueue(exec_ctx, arg->on_complete, false, NULL);
} else {
- bs->on_next = on_complete;
- bs->next = slice;
- unlock(exec_ctx, bs->transport);
- return 0;
+ bs->on_next = arg->on_complete;
+ bs->next = arg->slice;
}
+ incoming_byte_stream_unref(exec_ctx, bs);
}
-static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream *bs) {
- if (gpr_unref(&bs->refs)) {
- gpr_slice_buffer_destroy(&bs->slices);
- gpr_free(bs);
- }
+static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ gpr_slice *slice, size_t max_size_hint,
+ grpc_closure *on_complete) {
+ grpc_chttp2_incoming_byte_stream *bs =
+ (grpc_chttp2_incoming_byte_stream *)byte_stream;
+ incoming_byte_stream_next_arg arg = {bs, slice, max_size_hint, on_complete};
+ gpr_ref(&bs->refs);
+ grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
+ incoming_byte_stream_next_locked, &arg,
+ sizeof(arg));
+ return 0;
+}
+
+static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream);
+
+static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ void *byte_stream) {
+ grpc_chttp2_incoming_byte_stream *bs = byte_stream;
+ GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
+ decrement_active_streams_locked(exec_ctx, &bs->transport->global,
+ &bs->stream->global);
+ incoming_byte_stream_unref(exec_ctx, bs);
}
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream) {
- incoming_byte_stream_unref((grpc_chttp2_incoming_byte_stream *)byte_stream);
+ grpc_chttp2_incoming_byte_stream *bs =
+ (grpc_chttp2_incoming_byte_stream *)byte_stream;
+ grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
+ incoming_byte_stream_destroy_locked, bs, 0);
}
-void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_incoming_byte_stream *bs,
- gpr_slice slice) {
- gpr_mu_lock(&bs->transport->mu);
+typedef struct {
+ grpc_chttp2_incoming_byte_stream *byte_stream;
+ gpr_slice slice;
+} incoming_byte_stream_push_arg;
+
+static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ void *argp) {
+ incoming_byte_stream_push_arg *arg = argp;
+ grpc_chttp2_incoming_byte_stream *bs = arg->byte_stream;
if (bs->on_next != NULL) {
- *bs->next = slice;
+ *bs->next = arg->slice;
grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, true, NULL);
bs->on_next = NULL;
} else {
- gpr_slice_buffer_add(&bs->slices, slice);
+ gpr_slice_buffer_add(&bs->slices, arg->slice);
}
- gpr_mu_unlock(&bs->transport->mu);
+ incoming_byte_stream_unref(exec_ctx, bs);
+}
+
+void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_incoming_byte_stream *bs,
+ gpr_slice slice) {
+ incoming_byte_stream_push_arg arg = {bs, slice};
+ gpr_ref(&bs->refs);
+ grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
+ incoming_byte_stream_push_locked, &arg,
+ sizeof(arg));
+}
+
+static void incoming_byte_stream_finished_failed_locked(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
+ void *argp) {
+ grpc_chttp2_incoming_byte_stream *bs = argp;
+ grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, false, NULL);
+ bs->on_next = NULL;
+ bs->failed = 1;
+ incoming_byte_stream_unref(exec_ctx, bs);
+}
+
+static void incoming_byte_stream_finished_ok_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ void *argp) {
+ grpc_chttp2_incoming_byte_stream *bs = argp;
+ incoming_byte_stream_unref(exec_ctx, bs);
}
void grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, int success,
int from_parsing_thread) {
- if (!success) {
- if (from_parsing_thread) {
- gpr_mu_lock(&bs->transport->mu);
- }
- grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, false, NULL);
- bs->on_next = NULL;
- bs->failed = 1;
- if (from_parsing_thread) {
- gpr_mu_unlock(&bs->transport->mu);
+ if (from_parsing_thread) {
+ if (success) {
+ grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
+ incoming_byte_stream_finished_ok_locked,
+ bs, 0);
+ } else {
+ incoming_byte_stream_finished_ok_locked(exec_ctx, bs->transport,
+ bs->stream, bs);
}
} else {
-#ifndef NDEBUG
- if (from_parsing_thread) {
- gpr_mu_lock(&bs->transport->mu);
- }
- GPR_ASSERT(bs->on_next == NULL);
- if (from_parsing_thread) {
- gpr_mu_unlock(&bs->transport->mu);
+ if (success) {
+ grpc_chttp2_run_with_global_lock(
+ exec_ctx, bs->transport, bs->stream,
+ incoming_byte_stream_finished_failed_locked, bs, 0);
+ } else {
+ incoming_byte_stream_finished_failed_locked(exec_ctx, bs->transport,
+ bs->stream, bs);
}
-#endif
}
- incoming_byte_stream_unref(bs);
}
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
@@ -1680,6 +1879,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
incoming_byte_stream->next_message = NULL;
incoming_byte_stream->transport = TRANSPORT_FROM_PARSING(transport_parsing);
incoming_byte_stream->stream = STREAM_FROM_PARSING(stream_parsing);
+ gpr_ref(&incoming_byte_stream->stream->global.active_streams);
gpr_slice_buffer_init(&incoming_byte_stream->slices);
incoming_byte_stream->on_next = NULL;
incoming_byte_stream->is_tail = 1;
@@ -1810,7 +2010,7 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
gpr_slice *slices, size_t nslices) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
- REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */
+ REF_TRANSPORT(t, "reading_action"); /* matches unref inside reading_action */
gpr_slice_buffer_addn(&t->read_buffer, slices, nslices);
- recv_data(exec_ctx, t, 1);
+ reading_action(exec_ctx, t, 1);
}
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 98cd38abd4..8c4fa2d34a 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -236,9 +236,6 @@ struct grpc_chttp2_transport_parsing {
/** was a goaway frame received? */
uint8_t goaway_received;
- /** the last sent max_table_size setting */
- uint32_t last_sent_max_table_size;
-
/** initial window change */
int64_t initial_window_update;
@@ -272,6 +269,9 @@ struct grpc_chttp2_transport_parsing {
uint32_t incoming_frame_size;
uint32_t incoming_stream_id;
+ /* current max frame size */
+ uint32_t max_frame_size;
+
/* active parser */
void *parser_data;
grpc_chttp2_stream_parsing *incoming_stream;
@@ -282,6 +282,8 @@ struct grpc_chttp2_transport_parsing {
/* received settings */
uint32_t settings[GRPC_CHTTP2_NUM_SETTINGS];
+ /* last settings that were sent */
+ uint32_t last_sent_settings[GRPC_CHTTP2_NUM_SETTINGS];
/* goaway data */
grpc_status_code goaway_error;
@@ -291,27 +293,45 @@ struct grpc_chttp2_transport_parsing {
int64_t outgoing_window;
};
+typedef void (*grpc_chttp2_locked_action)(grpc_exec_ctx *ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, void *arg);
+
+typedef struct grpc_chttp2_executor_action_header {
+ grpc_chttp2_stream *stream;
+ grpc_chttp2_locked_action action;
+ struct grpc_chttp2_executor_action_header *next;
+ void *arg;
+} grpc_chttp2_executor_action_header;
+
struct grpc_chttp2_transport {
grpc_transport base; /* must be first */
- grpc_endpoint *ep;
gpr_refcount refs;
+ grpc_endpoint *ep;
char *peer_string;
/** when this drops to zero it's safe to shutdown the endpoint */
gpr_refcount shutdown_ep_refs;
- gpr_mu mu;
+ struct {
+ gpr_mu mu;
+
+ /** is a thread currently in the global lock */
+ bool global_active;
+ /** is a thread currently writing */
+ bool writing_active;
+ /** is a thread currently parsing */
+ bool parsing_active;
+
+ grpc_chttp2_executor_action_header *pending_actions_head;
+ grpc_chttp2_executor_action_header *pending_actions_tail;
+ } executor;
/** is the transport destroying itself? */
uint8_t destroying;
/** has the upper layer closed the transport? */
uint8_t closed;
- /** is a thread currently writing */
- uint8_t writing_active;
- /** is a thread currently parsing */
- uint8_t parsing_active;
-
/** is there a read request to the endpoint outstanding? */
uint8_t endpoint_reading;
@@ -338,8 +358,10 @@ struct grpc_chttp2_transport {
/** closure to execute writing */
grpc_closure writing_action;
- /** closure to finish reading from the endpoint */
- grpc_closure recv_data;
+ /** closure to start reading from the endpoint */
+ grpc_closure reading_action;
+ /** closure to actually do parsing */
+ grpc_closure parsing_action;
/** incoming read bytes */
gpr_slice_buffer read_buffer;
@@ -397,21 +419,26 @@ typedef struct {
grpc_transport_stream_stats *collecting_stats;
grpc_transport_stream_stats stats;
+ /** number of streams that are currently being read */
+ gpr_refcount active_streams;
+
/** when the application requests writes be closed, the write_closed is
'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */
- uint8_t write_closed;
+ bool write_closed;
/** is this stream reading half-closed (boolean) */
- uint8_t read_closed;
+ bool read_closed;
+ /** are all published incoming byte streams closed */
+ bool all_incoming_byte_streams_finished;
/** is this stream in the stream map? (boolean) */
- uint8_t in_stream_map;
+ bool in_stream_map;
/** has this stream seen an error? if 1, then pending incoming frames
can be thrown away */
- uint8_t seen_error;
+ bool seen_error;
- uint8_t published_initial_metadata;
- uint8_t published_trailing_metadata;
- uint8_t faked_trailing_metadata;
+ bool published_initial_metadata;
+ bool published_trailing_metadata;
+ bool faked_trailing_metadata;
grpc_chttp2_incoming_metadata_buffer received_initial_metadata;
grpc_chttp2_incoming_metadata_buffer received_trailing_metadata;
@@ -570,6 +597,9 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
void grpc_chttp2_list_add_check_read_ops(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
+bool grpc_chttp2_list_remove_check_read_ops(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_check_read_ops(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
@@ -645,6 +675,12 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global,
grpc_closure **pclosure, int success);
+void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *transport,
+ grpc_chttp2_stream *optional_stream,
+ grpc_chttp2_locked_action action,
+ void *arg, size_t sizeof_arg);
+
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
(sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index e827a43f7a..2995066e51 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -79,9 +79,12 @@ void grpc_chttp2_prepare_to_read(
GPR_TIMER_BEGIN("grpc_chttp2_prepare_to_read", 0);
transport_parsing->next_stream_id = transport_global->next_stream_id;
- transport_parsing->last_sent_max_table_size =
- transport_global->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE];
+ memcpy(transport_parsing->last_sent_settings,
+ transport_global->settings[GRPC_SENT_SETTINGS],
+ sizeof(transport_parsing->last_sent_settings));
+ transport_parsing->max_frame_size =
+ transport_global->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE];
/* update the parsing view of incoming window */
while (grpc_chttp2_list_pop_unannounced_incoming_window_available(
@@ -388,6 +391,12 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
return 1;
}
goto dts_fh_0; /* loop */
+ } else if (transport_parsing->incoming_frame_size >
+ transport_parsing->max_frame_size) {
+ gpr_log(GPR_DEBUG, "Frame size %d is larger than max frame size %d",
+ transport_parsing->incoming_frame_size,
+ transport_parsing->max_frame_size);
+ return 0;
}
if (++cur == end) {
return 1;
@@ -840,7 +849,11 @@ static int init_settings_frame_parser(
transport_parsing->settings_ack_received = 1;
grpc_chttp2_hptbl_set_max_bytes(
&transport_parsing->hpack_parser.table,
- transport_parsing->last_sent_max_table_size);
+ transport_parsing
+ ->last_sent_settings[GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
+ transport_parsing->max_frame_size =
+ transport_parsing
+ ->last_sent_settings[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE];
}
transport_parsing->parser = grpc_chttp2_settings_parser_parse;
transport_parsing->parser_data = &transport_parsing->simple.settings;
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c
index e5b35aadca..8f3ab00e6d 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.c
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.c
@@ -305,6 +305,14 @@ void grpc_chttp2_list_add_check_read_ops(
GRPC_CHTTP2_LIST_CHECK_READ_OPS);
}
+bool grpc_chttp2_list_remove_check_read_ops(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global) {
+ return stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
+ STREAM_FROM_GLOBAL(stream_global),
+ GRPC_CHTTP2_LIST_CHECK_READ_OPS);
+}
+
int grpc_chttp2_list_pop_check_read_ops(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index e36066d863..ad182d1f69 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -213,14 +213,16 @@ void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_pollset *pollset) {}
-void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack) {
+void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
+ void *and_free_memory) {
grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack);
size_t count = stack->count;
size_t i;
/* destroy per-filter data */
for (i = 0; i < count; i++) {
- elems[i].filter->destroy_call_elem(exec_ctx, &elems[i]);
+ elems[i].filter->destroy_call_elem(exec_ctx, &elems[i],
+ i == count - 1 ? and_free_memory : NULL);
}
}
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 9e3a25a152..36c17cb467 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -104,8 +104,12 @@ typedef struct {
void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_pollset *pollset);
/* Destroy per call data.
- The filter does not need to do any chaining */
- void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
+ The filter does not need to do any chaining.
+ The bottom filter of a stack will be passed a non-NULL pointer to
+ \a and_free_memory that should be passed to gpr_free when destruction
+ is complete. */
+ void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *and_free_memory);
/* sizeof(per channel data) */
size_t sizeof_channel_data;
@@ -223,7 +227,8 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
#endif
/* Destroy a call stack */
-void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack);
+void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
+ void *and_free_memory);
/* Ignore set pollset - used by filters to implement the set_pollset method
if they don't care about pollsets at all. Does nothing. */
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 3d42d0e616..5510c79b18 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -47,6 +47,8 @@
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
+int grpc_compress_filter_trace = 0;
+
typedef struct call_data {
gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */
grpc_linked_mdelem compression_algorithm_storage;
@@ -169,9 +171,29 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
did_compress =
grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp);
if (did_compress) {
+ if (grpc_compress_filter_trace) {
+ char *algo_name;
+ const size_t before_size = calld->slices.length;
+ const size_t after_size = tmp.length;
+ const float savings_ratio = 1.0f - (float)after_size / (float)before_size;
+ GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm,
+ &algo_name));
+ gpr_log(GPR_DEBUG,
+ "Compressed[%s] %d bytes vs. %d bytes (%.2f%% savings)",
+ algo_name, before_size, after_size, 100 * savings_ratio);
+ }
gpr_slice_buffer_swap(&calld->slices, &tmp);
calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ } else {
+ if (grpc_compress_filter_trace) {
+ char *algo_name;
+ GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm,
+ &algo_name));
+ gpr_log(GPR_DEBUG, "Algorithm '%s' enabled but decided not to compress.",
+ algo_name);
+ }
}
+
gpr_slice_buffer_destroy(&tmp);
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
@@ -246,8 +268,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *ignored) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
gpr_slice_buffer_destroy(&calld->slices);
diff --git a/src/core/lib/channel/compress_filter.h b/src/core/lib/channel/compress_filter.h
index 0d973329c4..cf5879d82e 100644
--- a/src/core/lib/channel/compress_filter.h
+++ b/src/core/lib/channel/compress_filter.h
@@ -38,6 +38,8 @@
#define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "grpc-internal-encoding-request"
+extern int grpc_compress_filter_trace;
+
/** Compression filter for outgoing data.
*
* See <grpc/compression.h> for the available compression settings.
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index c1debab4c6..68a3a7d6fd 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -102,12 +102,13 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *and_free_memory) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_transport_destroy_stream(exec_ctx, chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld));
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld),
+ and_free_memory);
}
/* Constructor for channel_data */
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index 211f537c69..516e708d1f 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -155,8 +155,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *ignored) {}
static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) {
unsigned i;
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index c140c61b8f..ba865416de 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -39,6 +39,9 @@
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/transport/static_metadata.h"
+#define EXPECTED_CONTENT_TYPE "application/grpc"
+#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1
+
typedef struct call_data {
uint8_t seen_path;
uint8_t seen_method;
@@ -92,8 +95,11 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
require */
return NULL;
} else if (md->key == GRPC_MDSTR_CONTENT_TYPE) {
- if (strncmp(grpc_mdstr_as_c_string(md->value), "application/grpc+", 17) ==
- 0) {
+ const char *value_str = grpc_mdstr_as_c_string(md->value);
+ if (strncmp(value_str, EXPECTED_CONTENT_TYPE,
+ EXPECTED_CONTENT_TYPE_LENGTH) == 0 &&
+ (value_str[EXPECTED_CONTENT_TYPE_LENGTH] == '+' ||
+ value_str[EXPECTED_CONTENT_TYPE_LENGTH] == ';')) {
/* Although the C implementation doesn't (currently) generate them,
any custom +-suffix is explicitly valid. */
/* TODO(klempner): We should consider preallocating common values such
@@ -102,8 +108,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
} else {
/* TODO(klempner): We're currently allowing this, but we shouldn't
see it without a proxy so log for now. */
- gpr_log(GPR_INFO, "Unexpected content-type %s",
- grpc_mdstr_as_c_string(md->value));
+ gpr_log(GPR_INFO, "Unexpected content-type %s", value_str);
}
return NULL;
} else if (md->key == GRPC_MDSTR_TE || md->key == GRPC_MDSTR_METHOD ||
@@ -220,8 +225,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *ignored) {}
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c
index 921c772453..a7efb5e73e 100644
--- a/src/core/lib/http/parser.c
+++ b/src/core/lib/http/parser.c
@@ -172,8 +172,8 @@ static int add_header(grpc_http_parser *parser) {
while (cur != end && (*cur == ' ' || *cur == '\t')) {
cur++;
}
- GPR_ASSERT(end - cur >= 2);
- hdr.value = buf2str(cur, (size_t)(end - cur) - 2);
+ GPR_ASSERT((size_t)(end - cur) >= parser->cur_line_end_length);
+ hdr.value = buf2str(cur, (size_t)(end - cur) - parser->cur_line_end_length);
if (parser->type == GRPC_HTTP_RESPONSE) {
hdr_count = &parser->http.response.hdr_count;
@@ -208,7 +208,7 @@ static int finish_line(grpc_http_parser *parser) {
parser->state = GRPC_HTTP_HEADERS;
break;
case GRPC_HTTP_HEADERS:
- if (parser->cur_line_length == 2) {
+ if (parser->cur_line_length == parser->cur_line_end_length) {
parser->state = GRPC_HTTP_BODY;
break;
}
@@ -248,6 +248,30 @@ static int addbyte_body(grpc_http_parser *parser, uint8_t byte) {
return 1;
}
+static int check_line(grpc_http_parser *parser) {
+ if (parser->cur_line_length >= 2 &&
+ parser->cur_line[parser->cur_line_length - 2] == '\r' &&
+ parser->cur_line[parser->cur_line_length - 1] == '\n') {
+ return 1;
+ }
+
+ // HTTP request with \n\r line termiantors.
+ else if (parser->cur_line_length >= 2 &&
+ parser->cur_line[parser->cur_line_length - 2] == '\n' &&
+ parser->cur_line[parser->cur_line_length - 1] == '\r') {
+ return 1;
+ }
+
+ // HTTP request with only \n line terminators.
+ else if (parser->cur_line_length >= 1 &&
+ parser->cur_line[parser->cur_line_length - 1] == '\n') {
+ parser->cur_line_end_length = 1;
+ return 1;
+ }
+
+ return 0;
+}
+
static int addbyte(grpc_http_parser *parser, uint8_t byte) {
switch (parser->state) {
case GRPC_HTTP_FIRST_LINE:
@@ -260,9 +284,7 @@ static int addbyte(grpc_http_parser *parser, uint8_t byte) {
}
parser->cur_line[parser->cur_line_length] = byte;
parser->cur_line_length++;
- if (parser->cur_line_length >= 2 &&
- parser->cur_line[parser->cur_line_length - 2] == '\r' &&
- parser->cur_line[parser->cur_line_length - 1] == '\n') {
+ if (check_line(parser)) {
return finish_line(parser);
} else {
return 1;
@@ -278,6 +300,7 @@ void grpc_http_parser_init(grpc_http_parser *parser) {
memset(parser, 0, sizeof(*parser));
parser->state = GRPC_HTTP_FIRST_LINE;
parser->type = GRPC_HTTP_UNKNOWN;
+ parser->cur_line_end_length = 2;
}
void grpc_http_parser_destroy(grpc_http_parser *parser) {
diff --git a/src/core/lib/http/parser.h b/src/core/lib/http/parser.h
index 42fa5181b8..536637e9a2 100644
--- a/src/core/lib/http/parser.h
+++ b/src/core/lib/http/parser.h
@@ -105,6 +105,7 @@ typedef struct {
uint8_t cur_line[GRPC_HTTP_PARSER_MAX_HEADER_LENGTH];
size_t cur_line_length;
+ size_t cur_line_end_length;
} grpc_http_parser;
void grpc_http_parser_init(grpc_http_parser *parser);
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 0eb95a2e09..7df1751352 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -44,7 +44,6 @@
static const grpc_event_engine_vtable *g_event_engine;
grpc_poll_function_type grpc_poll_function = poll;
-grpc_wakeup_fd grpc_global_wakeup_fd;
void grpc_event_engine_init(void) {
if ((g_event_engine = grpc_init_poll_and_epoll_posix())) {
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index 146663984d..60cef8ba77 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -166,8 +166,10 @@ bool grpc_iomgr_abort_on_leaks(void) {
if (env == NULL) return false;
static const char *truthy[] = {"yes", "Yes", "YES", "true",
"True", "TRUE", "1"};
+ bool should_we = false;
for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
- if (0 == strcmp(env, truthy[i])) return true;
+ if (0 == strcmp(env, truthy[i])) should_we = true;
}
- return false;
+ gpr_free(env);
+ return should_we;
}
diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c
index a65089c017..914736234d 100644
--- a/src/core/lib/iomgr/resolve_address_windows.c
+++ b/src/core/lib/iomgr/resolve_address_windows.c
@@ -167,8 +167,8 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
grpc_executor_enqueue(&r->request_closure, 1);
}
-void (*grpc_resolved_address)(grpc_exec_ctx *exec_ctx, const char *name,
- const char *default_port, grpc_resolve_cb cb,
- void *arg) = resolve_address_impl;
+void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name,
+ const char *default_port, grpc_resolve_cb cb,
+ void *arg) = resolve_address_impl;
#endif
diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c
index 7d78beb15a..66f9ff7a46 100644
--- a/src/core/lib/iomgr/tcp_client_windows.c
+++ b/src/core/lib/iomgr/tcp_client_windows.c
@@ -63,39 +63,45 @@ typedef struct {
grpc_endpoint **endpoint;
} async_connect;
-static void async_connect_unlock_and_cleanup(async_connect *ac) {
+static void async_connect_unlock_and_cleanup(async_connect *ac,
+ grpc_winsocket *socket) {
int done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
if (done) {
- if (ac->socket != NULL) grpc_winsocket_destroy(ac->socket);
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_name);
gpr_free(ac);
}
+ if (socket != NULL) grpc_winsocket_destroy(socket);
}
static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool occured) {
async_connect *ac = acp;
gpr_mu_lock(&ac->mu);
- /* If the alarm didn't occur, it got cancelled. */
- if (ac->socket != NULL && occured) {
+ if (ac->socket != NULL) {
grpc_winsocket_shutdown(ac->socket);
}
- async_connect_unlock_and_cleanup(ac);
+ async_connect_unlock_and_cleanup(ac, ac->socket);
}
static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, bool from_iocp) {
async_connect *ac = acp;
SOCKET sock = ac->socket->socket;
grpc_endpoint **ep = ac->endpoint;
+ GPR_ASSERT(*ep == NULL);
grpc_winsocket_callback_info *info = &ac->socket->write_info;
grpc_closure *on_done = ac->on_done;
+ gpr_mu_lock(&ac->mu);
+ grpc_winsocket *socket = ac->socket;
+ ac->socket = NULL;
+ gpr_mu_unlock(&ac->mu);
+
grpc_timer_cancel(exec_ctx, &ac->alarm);
gpr_mu_lock(&ac->mu);
- if (from_iocp) {
+ if (from_iocp && socket != NULL) {
DWORD transfered_bytes = 0;
DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
@@ -107,12 +113,12 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, bool from_iocp) {
ac->addr_name, utf8_message);
gpr_free(utf8_message);
} else {
- *ep = grpc_tcp_create(ac->socket, ac->addr_name);
- ac->socket = NULL;
+ *ep = grpc_tcp_create(socket, ac->addr_name);
+ socket = NULL;
}
}
- async_connect_unlock_and_cleanup(ac);
+ async_connect_unlock_and_cleanup(ac, socket);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
on_done->cb(exec_ctx, on_done->cb_arg, *ep != NULL);
@@ -138,6 +144,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
const char *message = NULL;
char *utf8_message;
grpc_winsocket_callback_info *info;
+ int last_error;
*endpoint = NULL;
@@ -208,8 +215,10 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
return;
failure:
- utf8_message = gpr_format_message(WSAGetLastError());
+ last_error = WSAGetLastError();
+ utf8_message = gpr_format_message(last_error);
gpr_log(GPR_ERROR, message, utf8_message);
+ gpr_log(GPR_ERROR, "last error = %d", last_error);
gpr_free(utf8_message);
if (socket != NULL) {
grpc_winsocket_destroy(socket);
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index 6940dec7b0..125f521d87 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -508,34 +508,6 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
}
}
-unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s,
- unsigned port_index) {
- grpc_tcp_listener *sp;
- for (sp = s->head; sp && port_index != 0; sp = sp->next, --port_index)
- ;
- if (sp) {
- return 1;
- } else {
- return 0;
- }
-}
-
-int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index,
- unsigned fd_index) {
- grpc_tcp_listener *sp;
- if (fd_index != 0) {
- /* Windows implementation has only one fd per port_index. */
- return -1;
- }
- for (sp = s->head; sp && port_index != 0; sp = sp->next, --port_index)
- ;
- if (sp) {
- return _open_osfhandle((intptr_t)sp->socket->socket, 0);
- } else {
- return -1;
- }
-}
-
void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
grpc_pollset **pollset, size_t pollset_count,
grpc_tcp_server_cb on_accept_cb,
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 7ee689a7e4..551149e1a6 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -35,6 +35,8 @@
#ifdef GPR_WINSOCK_SOCKET
+#include <limits.h>
+
#include "src/core/lib/iomgr/sockaddr_win32.h"
#include <grpc/support/alloc.h>
@@ -51,12 +53,20 @@
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/timer.h"
+#if defined(__MSYS__) && defined(GPR_ARCH_64)
+/* Nasty workaround for nasty bug when using the 64 bits msys compiler
+ in conjunction with Microsoft Windows headers. */
+#define GRPC_FIONBIO _IOW('f', 126, uint32_t)
+#else
+#define GRPC_FIONBIO FIONBIO
+#endif
+
static int set_non_block(SOCKET sock) {
int status;
- unsigned long param = 1;
+ uint32_t param = 1;
DWORD ret;
- status =
- WSAIoctl(sock, FIONBIO, &param, sizeof(param), NULL, 0, &ret, NULL, NULL);
+ status = WSAIoctl(sock, GRPC_FIONBIO, &param, sizeof(param), NULL, 0, &ret,
+ NULL, NULL);
return status == 0;
}
diff --git a/src/core/lib/security/client_auth_filter.c b/src/core/lib/security/client_auth_filter.c
index 943b1da85c..8b58cb86bf 100644
--- a/src/core/lib/security/client_auth_filter.c
+++ b/src/core/lib/security/client_auth_filter.c
@@ -277,8 +277,8 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *ignored) {
call_data *calld = elem->call_data;
grpc_call_credentials_unref(calld->creds);
if (calld->host != NULL) {
diff --git a/src/core/lib/security/server_auth_filter.c b/src/core/lib/security/server_auth_filter.c
index 7844dc87cb..3320497d21 100644
--- a/src/core/lib/security/server_auth_filter.c
+++ b/src/core/lib/security/server_auth_filter.c
@@ -224,8 +224,8 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_pollset *pollset) {}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *ignored) {}
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/support/env_win32.c b/src/core/lib/support/env_win32.c
index ef84c941df..e670e1e8d0 100644
--- a/src/core/lib/support/env_win32.c
+++ b/src/core/lib/support/env_win32.c
@@ -33,41 +33,47 @@
#include <grpc/support/port_platform.h>
-#ifdef GPR_WIN32
+#ifdef GPR_WIN32_ENV
+
+#include <windows.h>
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
-
-#ifdef __MINGW32__
-errno_t getenv_s(size_t *size_needed, char *buffer, size_t size,
- const char *varname);
-#else
-#include <stdlib.h>
-#endif
+#include "src/core/lib/support/string_win32.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
char *gpr_getenv(const char *name) {
- size_t size;
char *result = NULL;
- errno_t err;
+ DWORD size;
+ LPTSTR tresult = NULL;
+ LPTSTR tname = gpr_char_to_tchar(name);
+ DWORD ret;
- err = getenv_s(&size, NULL, 0, name);
- if (err || (size == 0)) return NULL;
- result = gpr_malloc(size);
- err = getenv_s(&size, result, size, name);
- if (err) {
- gpr_free(result);
+ ret = GetEnvironmentVariable(tname, NULL, 0);
+ if (ret == 0) return NULL;
+ size = ret * (DWORD)sizeof(TCHAR);
+ tresult = gpr_malloc(size);
+ ret = GetEnvironmentVariable(tname, tresult, size);
+ gpr_free(tname);
+ if (ret == 0) {
+ gpr_free(tresult);
return NULL;
}
+ result = gpr_tchar_to_char(tresult);
+ gpr_free(tresult);
return result;
}
void gpr_setenv(const char *name, const char *value) {
- errno_t res = _putenv_s(name, value);
- GPR_ASSERT(res == 0);
+ LPTSTR tname = gpr_char_to_tchar(name);
+ LPTSTR tvalue = gpr_char_to_tchar(value);
+ BOOL res = SetEnvironmentVariable(tname, tvalue);
+ gpr_free(tname);
+ gpr_free(tvalue);
+ GPR_ASSERT(res);
}
-#endif /* GPR_WIN32 */
+#endif /* GPR_WIN32_ENV */
diff --git a/src/core/lib/support/log_linux.c b/src/core/lib/support/log_linux.c
index ff3febb38e..ca04c022e3 100644
--- a/src/core/lib/support/log_linux.c
+++ b/src/core/lib/support/log_linux.c
@@ -41,7 +41,7 @@
#include <grpc/support/port_platform.h>
-#ifdef GPR_LINUX
+#ifdef GPR_LINUX_LOG
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -103,4 +103,4 @@ void gpr_default_log(gpr_log_func_args *args) {
gpr_free(prefix);
}
-#endif
+#endif /* GPR_LINUX_LOG */
diff --git a/src/core/lib/support/log_win32.c b/src/core/lib/support/log_win32.c
index ba78497a0a..29735bd18c 100644
--- a/src/core/lib/support/log_win32.c
+++ b/src/core/lib/support/log_win32.c
@@ -33,7 +33,7 @@
#include <grpc/support/port_platform.h>
-#ifdef GPR_WIN32
+#ifdef GPR_WIN32_LOG
#include <stdarg.h>
#include <stdio.h>
@@ -109,18 +109,4 @@ void gpr_default_log(gpr_log_func_args *args) {
fflush(stderr);
}
-char *gpr_format_message(int messageid) {
- LPTSTR tmessage;
- char *message;
- DWORD status = FormatMessage(
- FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
- FORMAT_MESSAGE_IGNORE_INSERTS,
- NULL, (DWORD)messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
- (LPTSTR)(&tmessage), 0, NULL);
- if (status == 0) return gpr_strdup("Unable to retrieve error string");
- message = gpr_tchar_to_char(tmessage);
- LocalFree(tmessage);
- return message;
-}
-
-#endif /* GPR_WIN32 */
+#endif /* GPR_WIN32_LOG */
diff --git a/src/core/lib/support/string_util_win32.c b/src/core/lib/support/string_util_win32.c
new file mode 100644
index 0000000000..f3cb0c050f
--- /dev/null
+++ b/src/core/lib/support/string_util_win32.c
@@ -0,0 +1,94 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* Posix code for gpr snprintf support. */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_WIN32
+
+/* Some platforms (namely msys) need wchar to be included BEFORE
+ anything else, especially strsafe.h. */
+#include <wchar.h>
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <string.h>
+#include <strsafe.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/support/string.h"
+
+#if defined UNICODE || defined _UNICODE
+LPTSTR
+gpr_char_to_tchar(LPCSTR input) {
+ LPTSTR ret;
+ int needed = MultiByteToWideChar(CP_UTF8, 0, input, -1, NULL, 0);
+ if (needed <= 0) return NULL;
+ ret = gpr_malloc((unsigned)needed * sizeof(TCHAR));
+ MultiByteToWideChar(CP_UTF8, 0, input, -1, ret, needed);
+ return ret;
+}
+
+LPSTR
+gpr_tchar_to_char(LPCTSTR input) {
+ LPSTR ret;
+ int needed = WideCharToMultiByte(CP_UTF8, 0, input, -1, NULL, 0, NULL, NULL);
+ if (needed <= 0) return NULL;
+ ret = gpr_malloc((unsigned)needed);
+ WideCharToMultiByte(CP_UTF8, 0, input, -1, ret, needed, NULL, NULL);
+ return ret;
+}
+#else
+char *gpr_tchar_to_char(LPTSTR input) { return gpr_strdup(input); }
+
+char *gpr_char_to_tchar(LPTSTR input) { return gpr_strdup(input); }
+#endif
+
+char *gpr_format_message(int messageid) {
+ LPTSTR tmessage;
+ char *message;
+ DWORD status = FormatMessage(
+ FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
+ FORMAT_MESSAGE_IGNORE_INSERTS,
+ NULL, (DWORD)messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPTSTR)(&tmessage), 0, NULL);
+ if (status == 0) return gpr_strdup("Unable to retrieve error string");
+ message = gpr_tchar_to_char(tmessage);
+ LocalFree(tmessage);
+ return message;
+}
+
+#endif /* GPR_WIN32 */
diff --git a/src/core/lib/support/string_win32.c b/src/core/lib/support/string_win32.c
index a2f9857356..6b92f79253 100644
--- a/src/core/lib/support/string_win32.c
+++ b/src/core/lib/support/string_win32.c
@@ -31,11 +31,11 @@
*
*/
-/* Posix code for gpr snprintf support. */
+/* Windows code for gpr snprintf support. */
#include <grpc/support/port_platform.h>
-#ifdef GPR_WIN32
+#ifdef GPR_WIN32_STRING
#include <stdarg.h>
#include <stdio.h>
@@ -80,30 +80,4 @@ int gpr_asprintf(char **strp, const char *format, ...) {
return -1;
}
-#if defined UNICODE || defined _UNICODE
-LPTSTR
-gpr_char_to_tchar(LPCSTR input) {
- LPTSTR ret;
- int needed = MultiByteToWideChar(CP_UTF8, 0, input, -1, NULL, 0);
- if (needed <= 0) return NULL;
- ret = gpr_malloc((unsigned)needed * sizeof(TCHAR));
- MultiByteToWideChar(CP_UTF8, 0, input, -1, ret, needed);
- return ret;
-}
-
-LPSTR
-gpr_tchar_to_char(LPCTSTR input) {
- LPSTR ret;
- int needed = WideCharToMultiByte(CP_UTF8, 0, input, -1, NULL, 0, NULL, NULL);
- if (needed <= 0) return NULL;
- ret = gpr_malloc((unsigned)needed);
- WideCharToMultiByte(CP_UTF8, 0, input, -1, ret, needed, NULL, NULL);
- return ret;
-}
-#else
-char *gpr_tchar_to_char(LPTSTR input) { return gpr_strdup(input); }
-
-char *gpr_char_to_tchar(LPTSTR input) { return gpr_strdup(input); }
-#endif
-
-#endif /* GPR_WIN32 */
+#endif /* GPR_WIN32_STRING */
diff --git a/src/core/lib/support/time_win32.c b/src/core/lib/support/time_win32.c
index f7acbd14a6..9e924ab3f4 100644
--- a/src/core/lib/support/time_win32.c
+++ b/src/core/lib/support/time_win32.c
@@ -35,7 +35,7 @@
#include <grpc/support/port_platform.h>
-#ifdef GPR_WIN32
+#ifdef GPR_WIN32_TIME
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@@ -107,4 +107,4 @@ void gpr_sleep_until(gpr_timespec until) {
}
}
-#endif /* GPR_WIN32 */
+#endif /* GPR_WIN32_TIME */
diff --git a/src/core/lib/support/tmpfile_msys.c b/src/core/lib/support/tmpfile_msys.c
new file mode 100644
index 0000000000..2fdc89a64f
--- /dev/null
+++ b/src/core/lib/support/tmpfile_msys.c
@@ -0,0 +1,73 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_MSYS_TMPFILE
+
+#include <io.h>
+#include <stdio.h>
+#include <string.h>
+#include <tchar.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/support/string_win32.h"
+#include "src/core/lib/support/tmpfile.h"
+
+FILE *gpr_tmpfile(const char *prefix, char **tmp_filename_out) {
+ FILE *result = NULL;
+ char tmp_filename[MAX_PATH];
+ UINT success;
+
+ if (tmp_filename_out != NULL) *tmp_filename_out = NULL;
+
+ /* Generate a unique filename with our template + temporary path. */
+ success = GetTempFileNameA(".", prefix, 0, tmp_filename);
+ fprintf(stderr, "success = %d\n", success);
+
+ if (success) {
+ /* Open a file there. */
+ result = fopen(tmp_filename, "wb+");
+ fprintf(stderr, "result = %p\n", result);
+ }
+ if (result != NULL && tmp_filename_out) {
+ *tmp_filename_out = gpr_strdup(tmp_filename);
+ }
+
+ return result;
+}
+
+#endif /* GPR_MSYS_TMPFILE */
diff --git a/src/core/lib/support/tmpfile_posix.c b/src/core/lib/support/tmpfile_posix.c
index 9e0e7ad808..0cd4bb6fc3 100644
--- a/src/core/lib/support/tmpfile_posix.c
+++ b/src/core/lib/support/tmpfile_posix.c
@@ -33,7 +33,7 @@
#include <grpc/support/port_platform.h>
-#ifdef GPR_POSIX_FILE
+#ifdef GPR_POSIX_TMPFILE
#include "src/core/lib/support/tmpfile.h"
@@ -82,4 +82,4 @@ end:
return result;
}
-#endif /* GPR_POSIX_FILE */
+#endif /* GPR_POSIX_TMPFILE */
diff --git a/src/core/lib/support/tmpfile_win32.c b/src/core/lib/support/tmpfile_win32.c
index 0cb2904f8d..9ac73128c3 100644
--- a/src/core/lib/support/tmpfile_win32.c
+++ b/src/core/lib/support/tmpfile_win32.c
@@ -33,7 +33,7 @@
#include <grpc/support/port_platform.h>
-#ifdef GPR_WIN32
+#ifdef GPR_WIN32_TMPFILE
#include <io.h>
#include <stdio.h>
@@ -81,4 +81,4 @@ end:
return result;
}
-#endif /* GPR_WIN32 */
+#endif /* GPR_WIN32_TMPFILE */
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index fa12b6ea61..9b2b94eedf 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -373,8 +373,6 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) {
if (c->receiving_stream != NULL) {
grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
}
- grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c));
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call");
gpr_mu_destroy(&c->mu);
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (c->status[i].details) {
@@ -392,7 +390,9 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) {
if (c->cq) {
GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
}
- gpr_free(c);
+ grpc_channel *channel = c->channel;
+ grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), c);
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
GPR_TIMER_END("destroy_call", 0);
}
@@ -1517,3 +1517,39 @@ grpc_compression_algorithm grpc_call_compression_for_level(
gpr_mu_unlock(&call->mu);
return grpc_compression_algorithm_for_level(level, accepted_encodings);
}
+
+const char *grpc_call_error_to_string(grpc_call_error error) {
+ switch (error) {
+ case GRPC_CALL_ERROR:
+ return "GRPC_CALL_ERROR";
+ case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
+ return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
+ case GRPC_CALL_ERROR_ALREADY_FINISHED:
+ return "GRPC_CALL_ERROR_ALREADY_FINISHED";
+ case GRPC_CALL_ERROR_ALREADY_INVOKED:
+ return "GRPC_CALL_ERROR_ALREADY_INVOKED";
+ case GRPC_CALL_ERROR_BATCH_TOO_BIG:
+ return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
+ case GRPC_CALL_ERROR_INVALID_FLAGS:
+ return "GRPC_CALL_ERROR_INVALID_FLAGS";
+ case GRPC_CALL_ERROR_INVALID_MESSAGE:
+ return "GRPC_CALL_ERROR_INVALID_MESSAGE";
+ case GRPC_CALL_ERROR_INVALID_METADATA:
+ return "GRPC_CALL_ERROR_INVALID_METADATA";
+ case GRPC_CALL_ERROR_NOT_INVOKED:
+ return "GRPC_CALL_ERROR_NOT_INVOKED";
+ case GRPC_CALL_ERROR_NOT_ON_CLIENT:
+ return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
+ case GRPC_CALL_ERROR_NOT_ON_SERVER:
+ return "GRPC_CALL_ERROR_NOT_ON_SERVER";
+ case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
+ return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
+ case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
+ return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
+ case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
+ return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
+ case GRPC_CALL_OK:
+ return "GRPC_CALL_OK";
+ }
+ GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
+}
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 5ec8808b50..1f82c3bad2 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -227,6 +227,10 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
#endif
GPR_TIMER_BEGIN("grpc_cq_end_op", 0);
+ GRPC_API_TRACE(
+ "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, success=%d, done=%p, "
+ "done_arg=%p, storage=%p)",
+ 7, (exec_ctx, cc, tag, success, done, done_arg, storage));
storage->tag = tag;
storage->done = done;
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 03f379aba8..57c6897626 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -164,10 +164,10 @@ void grpc_init(void) {
grpc_register_tracer("channel_stack_builder",
&grpc_trace_channel_stack_builder);
grpc_register_tracer("http1", &grpc_http1_trace);
+ grpc_register_tracer("compression", &grpc_compress_filter_trace);
grpc_security_pre_init();
grpc_iomgr_init();
grpc_executor_init();
- grpc_tracer_init("GRPC_TRACE");
gpr_timers_global_init();
grpc_cq_global_init();
for (i = 0; i < g_number_of_plugins; i++) {
@@ -179,6 +179,7 @@ void grpc_init(void) {
* at the appropriate time */
grpc_register_security_filters();
register_builtin_channel_init();
+ grpc_tracer_init("GRPC_TRACE");
/* no more changes to channel init pipelines */
grpc_channel_init_finalize();
}
diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c
index 80bd95df68..f50ec54cea 100644
--- a/src/core/lib/surface/lame_client.c
+++ b/src/core/lib/surface/lame_client.c
@@ -107,8 +107,10 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {}
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *and_free_memory) {
+ gpr_free(and_free_memory);
+}
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index ad8ee8c7a9..2db95b9cdf 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -820,8 +820,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
server_ref(chand->server);
}
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *ignored) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
diff --git a/src/core/lib/transport/metadata.c b/src/core/lib/transport/metadata.c
index 779efbb97d..5847ec9053 100644
--- a/src/core/lib/transport/metadata.c
+++ b/src/core/lib/transport/metadata.c
@@ -386,10 +386,18 @@ grpc_mdstr *grpc_mdstr_from_buffer(const uint8_t *buf, size_t length) {
for (s = shard->strs[idx]; s; s = s->bucket_next) {
if (s->hash == hash && GPR_SLICE_LENGTH(s->slice) == length &&
0 == memcmp(buf, GPR_SLICE_START_PTR(s->slice), length)) {
- GRPC_MDSTR_REF((grpc_mdstr *)s);
- gpr_mu_unlock(&shard->mu);
- GPR_TIMER_END("grpc_mdstr_from_buffer", 0);
- return (grpc_mdstr *)s;
+ if (gpr_atm_full_fetch_add(&s->refcnt, 1) == 0) {
+ /* If we get here, we've added a ref to something that was about to
+ * die - drop it immediately.
+ * The *only* possible path here (given the shard mutex) should be to
+ * drop from one ref back to zero - assert that with a CAS */
+ GPR_ASSERT(gpr_atm_rel_cas(&s->refcnt, 1, 0));
+ /* and treat this as if we were never here... sshhh */
+ } else {
+ gpr_mu_unlock(&shard->mu);
+ GPR_TIMER_END("grpc_mdstr_from_buffer", 0);
+ return (grpc_mdstr *)s;
+ }
}
}
@@ -397,7 +405,7 @@ grpc_mdstr *grpc_mdstr_from_buffer(const uint8_t *buf, size_t length) {
if (length + 1 < GPR_SLICE_INLINED_SIZE) {
/* string data goes directly into the slice */
s = gpr_malloc(sizeof(internal_string));
- gpr_atm_rel_store(&s->refcnt, 2);
+ gpr_atm_rel_store(&s->refcnt, 1);
s->slice.refcount = NULL;
memcpy(s->slice.data.inlined.bytes, buf, length);
s->slice.data.inlined.bytes[length] = 0;
@@ -406,7 +414,7 @@ grpc_mdstr *grpc_mdstr_from_buffer(const uint8_t *buf, size_t length) {
/* string data goes after the internal_string header, and we +1 for null
terminator */
s = gpr_malloc(sizeof(internal_string) + length + 1);
- gpr_atm_rel_store(&s->refcnt, 2);
+ gpr_atm_rel_store(&s->refcnt, 1);
s->refcount.ref = slice_ref;
s->refcount.unref = slice_unref;
s->slice.refcount = &s->refcount;
@@ -675,20 +683,19 @@ const char *grpc_mdstr_as_c_string(grpc_mdstr *s) {
grpc_mdstr *grpc_mdstr_ref(grpc_mdstr *gs DEBUG_ARGS) {
internal_string *s = (internal_string *)gs;
if (is_mdstr_static(gs)) return gs;
- GPR_ASSERT(gpr_atm_full_fetch_add(&s->refcnt, 1) != 0);
+ GPR_ASSERT(gpr_atm_full_fetch_add(&s->refcnt, 1) > 0);
return gs;
}
void grpc_mdstr_unref(grpc_mdstr *gs DEBUG_ARGS) {
internal_string *s = (internal_string *)gs;
if (is_mdstr_static(gs)) return;
- if (2 == gpr_atm_full_fetch_add(&s->refcnt, -1)) {
+ if (1 == gpr_atm_full_fetch_add(&s->refcnt, -1)) {
strtab_shard *shard =
&g_strtab_shard[SHARD_IDX(s->hash, LOG2_STRTAB_SHARD_COUNT)];
gpr_mu_lock(&shard->mu);
- if (1 == gpr_atm_no_barrier_load(&s->refcnt)) {
- internal_destroy_string(shard, s);
- }
+ GPR_ASSERT(0 == gpr_atm_no_barrier_load(&s->refcnt));
+ internal_destroy_string(shard, s);
gpr_mu_unlock(&shard->mu);
}
}
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 53c634adca..e6d524abe6 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -133,8 +133,9 @@ void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx,
void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
- grpc_stream *stream) {
- transport->vtable->destroy_stream(exec_ctx, transport, stream);
+ grpc_stream *stream, void *and_free_memory) {
+ transport->vtable->destroy_stream(exec_ctx, transport, stream,
+ and_free_memory);
}
char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 1eb446312b..482a9d1791 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -98,6 +98,11 @@ void grpc_transport_move_stats(grpc_transport_stream_stats *from,
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op {
+ /** Should be enqueued when all requested operations (excluding recv_message
+ and recv_initial_metadata which have their own closures) in a given batch
+ have been completed. */
+ grpc_closure *on_complete;
+
/** Send initial metadata to the peer, from the provided metadata batch.
idempotent_request MUST be set if this is non-null */
grpc_metadata_batch *send_initial_metadata;
@@ -129,11 +134,6 @@ typedef struct grpc_transport_stream_op {
/** Collect any stats into provided buffer, zero internal stat counters */
grpc_transport_stream_stats *collect_stats;
- /** Should be enqueued when all requested operations (excluding recv_message
- and recv_initial_metadata which have their own closures) in a given batch
- have been completed. */
- grpc_closure *on_complete;
-
/** If != GRPC_STATUS_OK, cancel this stream */
grpc_status_code cancel_with_status;
@@ -213,7 +213,7 @@ void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx,
caller, but any child memory must be cleaned up) */
void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
- grpc_stream *stream);
+ grpc_stream *stream, void *and_free_memory);
void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op);
diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h
index 2ff67073af..956155eec8 100644
--- a/src/core/lib/transport/transport_impl.h
+++ b/src/core/lib/transport/transport_impl.h
@@ -63,7 +63,7 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_destroy_stream */
void (*destroy_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
- grpc_stream *stream);
+ grpc_stream *stream, void *and_free_memory);
/* implementation of grpc_transport_destroy */
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self);