diff options
author | ncteisen <ncteisen@gmail.com> | 2018-08-02 20:55:00 -0700 |
---|---|---|
committer | ncteisen <ncteisen@gmail.com> | 2018-08-03 11:13:52 -0400 |
commit | fde951db9c6f87597faa7ae1b3d0f521ff1adb51 (patch) | |
tree | 5d5e72bfe5cab16b506ab40210b0b08ecbc49a2a /src/core | |
parent | b35328f0127fda3f13b41ae91c885dc4be2dfb02 (diff) |
Intecept recv_trailing in client_channel for channelz
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.cc | 87 | ||||
-rw-r--r-- | src/core/lib/channel/connected_channel.cc | 12 | ||||
-rw-r--r-- | src/core/lib/surface/call.cc | 10 |
3 files changed, 91 insertions, 18 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index f4d5596270..c3669189f4 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -923,6 +923,10 @@ typedef struct client_channel_call_data { grpc_closure pick_closure; grpc_closure pick_cancel_closure; + grpc_closure recv_trailing_metadata_ready_channelz; + grpc_closure* original_recv_trailing_metadata; + // metadata_batch recv_trailing_metadata_channelz; + grpc_polling_entity* pollent; bool pollent_added_to_interested_parties; @@ -984,6 +988,14 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem); static void on_complete(void* arg, grpc_error* error); static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); static void start_pick_locked(void* arg, grpc_error* ignored); +template <typename Predicate> +static pending_batch* pending_batch_find(grpc_call_element* elem, + const char* log_message, + Predicate predicate); +static void get_call_status(grpc_call_element* elem, + grpc_metadata_batch* md_batch, grpc_error* error, + grpc_status_code* status, + grpc_mdelem** server_pushback_md); // // send op data caching @@ -1258,6 +1270,59 @@ static void resume_pending_batch_in_call_combiner(void* arg, grpc_subchannel_call_process_op(subchannel_call, batch); } +static void recv_trailing_metadata_ready_channelz(void* arg, + grpc_error* error) { + grpc_call_element* elem = static_cast<grpc_call_element*>(arg); + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, " + "error=%s", + chand, calld, grpc_error_string(error)); + } + // find the right pending batch. + pending_batch* pending = pending_batch_find( + elem, "invoking recv_trailing_metadata_channelz for", + [](grpc_transport_stream_op_batch* batch) { + return batch->recv_trailing_metadata && + batch->payload->recv_trailing_metadata + .recv_trailing_metadata_ready != nullptr; + }); + grpc_status_code status = GRPC_STATUS_OK; + grpc_metadata_batch* md_batch = + pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata; + get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr); + grpc_core::channelz::SubchannelNode* channelz_subchannel = + calld->pick.connected_subchannel->channelz_subchannel(); + GPR_ASSERT(channelz_subchannel != nullptr); + if (status == GRPC_STATUS_OK) { + channelz_subchannel->RecordCallSucceeded(); + } else { + channelz_subchannel->RecordCallFailed(); + } + pending->batch = nullptr; + GRPC_CLOSURE_SCHED(calld->original_recv_trailing_metadata, error); +} + +static bool maybe_intercept_recv_trailing_for_channelz( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + call_data* calld = static_cast<call_data*>(elem->call_data); + // only add interceptor is channelz is enabled. + if (calld->pick.connected_subchannel->channelz_subchannel() != nullptr) { + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz, + recv_trailing_metadata_ready_channelz, elem, + grpc_schedule_on_exec_ctx); + calld->original_recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready_channelz; + return true; + } else { + return false; + } +} + // This is called via the call combiner, so access to calld is synchronized. static void pending_batches_resume(grpc_call_element* elem) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); @@ -1282,13 +1347,17 @@ static void pending_batches_resume(grpc_call_element* elem) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { + bool intercepted = + maybe_intercept_recv_trailing_for_channelz(elem, batch); batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, resume_pending_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE, "pending_batches_resume"); - pending_batch_clear(calld, pending); + if (!intercepted) { + pending_batch_clear(calld, pending); + } } } // Note: This will release the call combiner. @@ -1768,22 +1837,20 @@ static void recv_message_ready(void* arg, grpc_error* error) { // // Sets *status and *server_pushback_md based on batch_data and error. -static void get_call_status(subchannel_batch_data* batch_data, - grpc_error* error, grpc_status_code* status, +static void get_call_status(grpc_call_element* elem, + grpc_metadata_batch* md_batch, grpc_error* error, + grpc_status_code* status, grpc_mdelem** server_pushback_md) { - grpc_call_element* elem = batch_data->elem; call_data* calld = static_cast<call_data*>(elem->call_data); if (error != GRPC_ERROR_NONE) { grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr, nullptr); } else { - grpc_metadata_batch* md_batch = - batch_data->batch.payload->recv_trailing_metadata - .recv_trailing_metadata; GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); *status = grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md); - if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { + if (server_pushback_md != nullptr && + md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; } } @@ -1956,7 +2023,9 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { // Get the call's status and check for server pushback metadata. grpc_status_code status = GRPC_STATUS_OK; grpc_mdelem* server_pushback_md = nullptr; - get_call_status(batch_data, GRPC_ERROR_REF(error), &status, + grpc_metadata_batch* md_batch = + batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata; + get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, &server_pushback_md); grpc_core::channelz::SubchannelNode* channelz_subchannel = calld->pick.connected_subchannel->channelz_subchannel(); diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index e2ea334ded..90a0254663 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -104,18 +104,18 @@ static void con_start_transport_stream_op_batch( if (batch->recv_initial_metadata) { callback_state* state = &calld->recv_initial_metadata_ready; intercept_callback( - calld, state, false, "recv_initial_metadata_ready", + calld, state, false, "connected_recv_initial_metadata_ready", &batch->payload->recv_initial_metadata.recv_initial_metadata_ready); } if (batch->recv_message) { callback_state* state = &calld->recv_message_ready; - intercept_callback(calld, state, false, "recv_message_ready", + intercept_callback(calld, state, false, "connected_recv_message_ready", &batch->payload->recv_message.recv_message_ready); } if (batch->recv_trailing_metadata) { callback_state* state = &calld->recv_trailing_metadata_ready; intercept_callback( - calld, state, false, "recv_trailing_metadata_ready", + calld, state, false, "connected_recv_trailing_metadata_ready", &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready); } if (batch->cancel_stream) { @@ -126,11 +126,13 @@ static void con_start_transport_stream_op_batch( // closure for each one. callback_state* state = static_cast<callback_state*>(gpr_malloc(sizeof(*state))); - intercept_callback(calld, state, true, "on_complete (cancel_stream)", + intercept_callback(calld, state, true, + "connected_on_complete (cancel_stream)", &batch->on_complete); } else if (batch->on_complete != nullptr) { callback_state* state = get_state_for_batch(calld, batch); - intercept_callback(calld, state, false, "on_complete", &batch->on_complete); + intercept_callback(calld, state, false, "connected_on_complete", + &batch->on_complete); } grpc_transport_perform_stream_op( chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), batch); diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 26dd361e0d..859915affb 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -1422,7 +1422,7 @@ static void receiving_stream_ready_in_call_combiner(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; - GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready"); + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "call_recv_message_ready"); receiving_stream_ready(bctlp, error); } @@ -1507,7 +1507,8 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; - GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready"); + GRPC_CALL_COMBINER_STOP(&call->call_combiner, + "call_recv_initial_metadata_ready"); add_batch_error(bctl, GRPC_ERROR_REF(error), false); if (error == GRPC_ERROR_NONE) { @@ -1558,7 +1559,8 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; - GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready"); + GRPC_CALL_COMBINER_STOP(&call->call_combiner, + "call_recv_trailing_metadata_ready"); add_batch_error(bctl, GRPC_ERROR_REF(error), false); grpc_metadata_batch* md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; @@ -1569,7 +1571,7 @@ static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) { static void finish_batch(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; - GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete"); + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "call_on_complete"); add_batch_error(bctl, GRPC_ERROR_REF(error), false); finish_batch_step(bctl); } |