diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/channel_stack.c | 8 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 6 | ||||
-rw-r--r-- | src/core/surface/call.c | 9 | ||||
-rw-r--r-- | src/core/surface/channel.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 14 | ||||
-rw-r--r-- | src/core/transport/transport.c | 21 | ||||
-rw-r--r-- | src/core/transport/transport.h | 15 |
7 files changed, 57 insertions, 18 deletions
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 54b0375c25..559ad0a32c 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -116,8 +116,8 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, size_t i; stack->count = filter_count; - gpr_ref_init(&stack->refcount.refs, initial_refs); - grpc_closure_init(&stack->refcount.destroy, destroy, destroy_arg); + GRPC_STREAM_REF_INIT(&stack->refcount, initial_refs, destroy, destroy_arg, + "CHANNEL_STACK"); elems = CHANNEL_ELEMS_FROM_STACK(stack); user_data = ((char *)elems) + @@ -169,8 +169,8 @@ void grpc_call_stack_init(grpc_exec_ctx *exec_ctx, size_t i; call_stack->count = count; - gpr_ref_init(&call_stack->refcount.refs, initial_refs); - grpc_closure_init(&call_stack->refcount.destroy, destroy, destroy_arg); + GRPC_STREAM_REF_INIT(&call_stack->refcount, initial_refs, destroy, + destroy_arg, "CALL_STACK"); call_elems = CALL_ELEMS_FROM_STACK(call_stack); user_data = ((char *)call_elems) + ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element)); diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index e093c3e9a9..1c9652fc47 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -363,9 +363,9 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_unlock(&p->mu); for (i = 0; i < n; i++) { - if (selected == grpc_subchannel_get_connected_subchannel(subchannels[i])) - continue; - grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); + if (selected != grpc_subchannel_get_connected_subchannel(subchannels[i])) { + grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); + } GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast"); } if (p->selected) { diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 315eeb0449..1dac6efa11 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -739,8 +739,15 @@ static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, char *grpc_call_get_peer(grpc_call *call) { grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - char *result = elem->filter->get_peer(&exec_ctx, elem); + char *result; GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call)); + result = elem->filter->get_peer(&exec_ctx, elem); + if (result == NULL) { + result = grpc_channel_get_target(call->channel); + } + if (result == NULL) { + result = gpr_strdup("unknown"); + } grpc_exec_ctx_finish(&exec_ctx); return result; } diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 7dea609ebc..3d2ff23542 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -53,7 +53,7 @@ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int status_code); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); -#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG +#ifdef GRPC_STREAM_REFCOUNT_DEBUG void grpc_channel_internal_ref(grpc_channel *channel, const char *reason); void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, grpc_channel *channel, const char *reason); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index dedd208cbf..79ebaa70fe 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -898,10 +898,16 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); - if (op->on_connectivity_state_change) { - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, - op->on_connectivity_state_change); + if (op->on_connectivity_state_change != NULL) { + if (op->connectivity_state != NULL) { + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, + op->on_connectivity_state_change); + } else { + grpc_connectivity_state_change_unsubscribe( + exec_ctx, &t->channel_callback.state_tracker, + op->on_connectivity_state_change); + } } if (op->send_goaway) { diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index f2bebc62f3..2ab978be46 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -40,8 +40,8 @@ #ifdef GRPC_STREAM_REFCOUNT_DEBUG void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) { gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); - gpr_log(GPR_DEBUG, "STREAM %p:%p REF %d->%d %s", refcount, - refcount->destroy.cb_arg, val, val + 1, reason); + gpr_log(GPR_DEBUG, "%s %p:%p REF %d->%d %s", refcount->object_type, + refcount, refcount->destroy.cb_arg, val, val + 1, reason); #else void grpc_stream_ref(grpc_stream_refcount *refcount) { #endif @@ -52,8 +52,8 @@ void grpc_stream_ref(grpc_stream_refcount *refcount) { void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount, const char *reason) { gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); - gpr_log(GPR_DEBUG, "STREAM %p:%p UNREF %d->%d %s", refcount, - refcount->destroy.cb_arg, val, val - 1, reason); + gpr_log(GPR_DEBUG, "%s %p:%p UNREF %d->%d %s", refcount->object_type, + refcount, refcount->destroy.cb_arg, val, val - 1, reason); #else void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount) { @@ -63,6 +63,19 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, } } +#ifdef GRPC_STREAM_REFCOUNT_DEBUG +void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, + grpc_iomgr_cb_func cb, void *cb_arg, + const char *object_type) { + refcount->object_type = object_type; +#else +void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, + grpc_iomgr_cb_func cb, void *cb_arg) { +#endif + gpr_ref_init(&refcount->refs, initial_refs); + grpc_closure_init(&refcount->destroy, cb, cb_arg); +} + size_t grpc_transport_stream_size(grpc_transport *transport) { return transport->vtable->sizeof_stream; } diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index f1059801d4..f94f0ae76e 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -50,19 +50,32 @@ typedef struct grpc_transport grpc_transport; for a stream. */ typedef struct grpc_stream grpc_stream; +/*#define GRPC_STREAM_REFCOUNT_DEBUG*/ + typedef struct grpc_stream_refcount { gpr_refcount refs; grpc_closure destroy; +#ifdef GRPC_STREAM_REFCOUNT_DEBUG + const char *object_type; +#endif } grpc_stream_refcount; -/*#define GRPC_STREAM_REFCOUNT_DEBUG*/ #ifdef GRPC_STREAM_REFCOUNT_DEBUG +void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, + grpc_iomgr_cb_func cb, void *cb_arg, + const char *object_type); void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason); void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount, const char *reason); +#define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \ + grpc_stream_ref_init(rc, ir, cb, cb_arg, objtype) #else +void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, + grpc_iomgr_cb_func cb, void *cb_arg); void grpc_stream_ref(grpc_stream_refcount *refcount); void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount); +#define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \ + grpc_stream_ref_init(rc, ir, cb, cb_arg) #endif /* Transport stream op: a set of operations to perform on a transport |