diff options
-rw-r--r-- | src/core/census/grpc_filter.c | 4 | ||||
-rw-r--r-- | src/core/channel/channel_stack.c | 6 | ||||
-rw-r--r-- | src/core/channel/channel_stack.h | 11 | ||||
-rw-r--r-- | src/core/channel/client_channel.c | 5 | ||||
-rw-r--r-- | src/core/channel/compress_filter.c | 4 | ||||
-rw-r--r-- | src/core/channel/connected_channel.c | 13 | ||||
-rw-r--r-- | src/core/channel/http_client_filter.c | 4 | ||||
-rw-r--r-- | src/core/channel/http_server_filter.c | 4 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 6 | ||||
-rw-r--r-- | src/core/security/client_auth_filter.c | 4 | ||||
-rw-r--r-- | src/core/security/server_auth_filter.c | 4 | ||||
-rw-r--r-- | src/core/surface/call.c | 3 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 12 | ||||
-rw-r--r-- | src/core/surface/server.c | 4 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 57 | ||||
-rw-r--r-- | src/core/transport/transport.c | 5 | ||||
-rw-r--r-- | src/core/transport/transport.h | 2 | ||||
-rw-r--r-- | src/core/transport/transport_impl.h | 2 | ||||
-rw-r--r-- | test/core/channel/channel_stack_test.c | 6 |
19 files changed, 83 insertions, 73 deletions
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c index c8aaf31e2d..987407f050 100644 --- a/src/core/census/grpc_filter.c +++ b/src/core/census/grpc_filter.c @@ -134,7 +134,7 @@ static void client_init_call_elem(grpc_exec_ctx *exec_ctx, } static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { + grpc_call_element *elem, void *ignored) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); /* TODO(hongyu): record rpc client stats and census_rpc_end_op here */ @@ -152,7 +152,7 @@ static void server_init_call_elem(grpc_exec_ctx *exec_ctx, } static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { + grpc_call_element *elem, void *ignored) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); /* TODO(hongyu): record rpc server stats and census_tracing_end_op here */ diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 3e61688364..1869597975 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -213,14 +213,16 @@ void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_pollset *pollset) {} -void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack) { +void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack, + void *and_free_memory) { grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack); size_t count = stack->count; size_t i; /* destroy per-filter data */ for (i = 0; i < count; i++) { - elems[i].filter->destroy_call_elem(exec_ctx, &elems[i]); + elems[i].filter->destroy_call_elem(exec_ctx, &elems[i], + i == count - 1 ? and_free_memory : NULL); } } diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 52362f0b20..4407333aff 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -104,8 +104,12 @@ typedef struct { void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_pollset *pollset); /* Destroy per call data. - The filter does not need to do any chaining */ - void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); + The filter does not need to do any chaining. + The bottom filter of a stack will be passed a non-NULL pointer to + \a and_free_memory that should be passed to gpr_free when destruction + is complete. */ + void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *and_free_memory); /* sizeof(per channel data) */ size_t sizeof_channel_data; @@ -223,7 +227,8 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx, #endif /* Destroy a call stack */ -void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack); +void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack, + void *and_free_memory); /* Ignore set pollset - used by filters to implement the set_pollset method if they don't care about pollsets at all. Does nothing. */ diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index f021a8ae32..27d6e03d28 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -379,9 +379,10 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *and_free_memory) { grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data); + gpr_free(and_free_memory); } /* Constructor for channel_data */ diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 3e7ca08fd2..d5d5fc2e35 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -246,8 +246,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; gpr_slice_buffer_destroy(&calld->slices); diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index e7ed3ccfeb..e6e9daa452 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -37,13 +37,13 @@ #include <stdio.h> #include <string.h> -#include "src/core/support/string.h" -#include "src/core/transport/transport.h" -#include "src/core/profiling/timers.h" #include <grpc/byte_buffer.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/slice_buffer.h> +#include "src/core/profiling/timers.h" +#include "src/core/support/string.h" +#include "src/core/transport/transport.h" #define MAX_BUFFER_LENGTH 8192 @@ -102,12 +102,13 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *and_free_memory) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_transport_destroy_stream(exec_ctx, chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld)); + TRANSPORT_STREAM_FROM_CALL_DATA(calld), + and_free_memory); } /* Constructor for channel_data */ diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 1aa27208c2..26f1110ac4 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -151,8 +151,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) {} +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) {} static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) { unsigned i; diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 370f8dbe42..6c97a0762d 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -212,8 +212,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) {} +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) {} /* Constructor for channel_data */ static void init_channel_elem(grpc_exec_ctx *exec_ctx, diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 8f150a8d81..7a61df40df 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -620,9 +620,9 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, bool success) { grpc_subchannel_call *c = call; GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); - grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, c->connection, "subchannel_call"); - gpr_free(c); + grpc_connected_subchannel *connection = c->connection; + grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), c); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call"); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index 332d4259d2..c72bfc40e6 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -277,8 +277,8 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) { call_data *calld = elem->call_data; grpc_call_credentials_unref(calld->creds); if (calld->host != NULL) { diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index 3d8e5e8d35..3d348c8266 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -224,8 +224,8 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_pollset *pollset) {} /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) {} +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) {} /* Constructor for channel_data */ static void init_channel_elem(grpc_exec_ctx *exec_ctx, diff --git a/src/core/surface/call.c b/src/core/surface/call.c index d9808e6f4c..e7861b8e3b 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -375,7 +375,6 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) { if (c->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); } - grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c)); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); gpr_mu_destroy(&c->mu); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { @@ -394,7 +393,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) { if (c->cq) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } - gpr_free(c); + grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), c); GPR_TIMER_END("destroy_call", 0); } diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 58f89946d2..9ef88bba65 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -37,13 +37,13 @@ #include <string.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> #include "src/core/channel/channel_stack.h" #include "src/core/support/string.h" #include "src/core/surface/api_trace.h" -#include "src/core/surface/channel.h" #include "src/core/surface/call.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> +#include "src/core/surface/channel.h" typedef struct { grpc_linked_mdelem status; @@ -104,8 +104,10 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) {} -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) {} +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *and_free_memory) { + gpr_free(and_free_memory); +} static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, diff --git a/src/core/surface/server.c b/src/core/surface/server.c index da93474b26..88554e4224 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -692,8 +692,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, server_ref(chand->server); } -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 182c713ee7..3a73826c3a 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -512,26 +512,20 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, return 0; } -static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; +static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, void *arg) { grpc_byte_stream *bs; -#if 0 - int i; - GPR_TIMER_BEGIN("destroy_stream", 0); - gpr_mu_lock(&t->mu); - GPR_ASSERT((s->global.write_closed && s->global.read_closed) || s->global.id == 0); GPR_ASSERT(!s->global.in_stream_map); if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { - close_transport_locked(exec_ctx, t); + close_transport_locked(exec_ctx, t, NULL, NULL); } - if (!t->parsing_active && s->global.id) { + if (!t->executor.parsing_active && s->global.id) { GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map, s->global.id) == NULL); } @@ -539,11 +533,11 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global, &s->global); grpc_chttp2_list_remove_stalled_by_transport(&t->global, &s->global); -#endif - int i; + /* TODO(ctiller): the remainder of this function could be done without the + global lock */ - for (i = 0; i < STREAM_LIST_COUNT; i++) { + for (int i = 0; i < STREAM_LIST_COUNT; i++) { if (s->included[i]) { gpr_log(GPR_ERROR, "%s stream %d still included in list %d", t->global.is_client ? "client" : "server", s->global.id, i); @@ -574,6 +568,17 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, UNREF_TRANSPORT(exec_ctx, t, "stream"); GPR_TIMER_END("destroy_stream", 0); + + gpr_free(arg); +} + +static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, void *and_free_memory) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; + grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; + + grpc_chttp2_run_with_global_lock(exec_ctx, t, s, destroy_stream_locked, + and_free_memory, 0); } grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( @@ -1509,8 +1514,8 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success) { GPR_TIMER_BEGIN("reading_action.parse", 0); size_t i = 0; for (; i < t->read_buffer.count && - grpc_chttp2_perform_read(exec_ctx, &t->parsing, - t->read_buffer.slices[i]); + grpc_chttp2_perform_read(exec_ctx, &t->parsing, + t->read_buffer.slices[i]); i++) ; if (i != t->read_buffer.count) { @@ -1602,10 +1607,9 @@ static void connectivity_state_set( grpc_connectivity_state state, const char *reason) { GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_DEBUG, "set connectivity_state=%d", state)); - grpc_connectivity_state_set( - exec_ctx, - &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker, - state, reason); + grpc_connectivity_state_set(exec_ctx, &TRANSPORT_FROM_GLOBAL(transport_global) + ->channel_callback.state_tracker, + state, reason); } /******************************************************************************* @@ -1915,15 +1919,10 @@ static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) { return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string); } -static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), - "chttp2", - init_stream, - set_pollset, - perform_stream_op, - perform_transport_op, - destroy_stream, - destroy_transport, - chttp2_get_peer}; +static const grpc_transport_vtable vtable = { + sizeof(grpc_chttp2_stream), "chttp2", init_stream, set_pollset, + perform_stream_op, perform_transport_op, destroy_stream, destroy_transport, + chttp2_get_peer}; grpc_transport *grpc_create_chttp2_transport( grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args, diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 5b5af0e7d0..b42980431a 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -133,8 +133,9 @@ void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx, void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, - grpc_stream *stream) { - transport->vtable->destroy_stream(exec_ctx, transport, stream); + grpc_stream *stream, void *and_free_memory) { + transport->vtable->destroy_stream(exec_ctx, transport, stream, + and_free_memory); } char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 908ef0bda4..ddb81f9a34 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -208,7 +208,7 @@ void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx, caller, but any child memory must be cleaned up) */ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, - grpc_stream *stream); + grpc_stream *stream, void *and_free_memory); void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op); diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index d9ecc4d2ba..9d53971f4d 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -63,7 +63,7 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_destroy_stream */ void (*destroy_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self, - grpc_stream *stream); + grpc_stream *stream, void *and_free_memory); /* implementation of grpc_transport_destroy */ void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self); diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index e19e9a57ae..ca7c57c94e 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -62,8 +62,8 @@ static void call_init_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void channel_destroy_func(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {} -static void call_destroy_func(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void call_destroy_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) { ++*(int *)(elem->channel_data); } @@ -87,7 +87,7 @@ static void free_channel(grpc_exec_ctx *exec_ctx, void *arg, bool success) { } static void free_call(grpc_exec_ctx *exec_ctx, void *arg, bool success) { - grpc_call_stack_destroy(exec_ctx, arg); + grpc_call_stack_destroy(exec_ctx, arg, NULL); gpr_free(arg); } |