diff options
author | ncteisen <ncteisen@gmail.com> | 2018-06-19 14:00:52 -0700 |
---|---|---|
committer | ncteisen <ncteisen@gmail.com> | 2018-06-19 14:00:52 -0700 |
commit | 68d4f50f77641f80794134f0d4149034df1706f0 (patch) | |
tree | a8a6a2221c5b408e63d4c51b6cb9773ecca8e566 /src | |
parent | 4d1da600b525a514f0a5c1d768fa3a58d65ecded (diff) | |
parent | 9e3e64604d9a3bb4bea9202ede844e6f7b592eeb (diff) |
Merge branch 'master' of https://github.com/grpc/grpc into channelz
Diffstat (limited to 'src')
81 files changed, 3340 insertions, 1303 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index ea6775a8d8..34ea97e23e 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -817,6 +817,7 @@ typedef struct { // For intercepting recv_trailing_metadata. grpc_metadata_batch recv_trailing_metadata; grpc_transport_stream_stats collect_stats; + grpc_closure recv_trailing_metadata_ready; // For intercepting on_complete. grpc_closure on_complete; } subchannel_batch_data; @@ -1192,35 +1193,24 @@ static void pending_batches_fail(grpc_call_element* elem, grpc_error* error, "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s", elem->channel_data, calld, num_batches, grpc_error_string(error)); } - grpc_transport_stream_op_batch* - batches[GPR_ARRAY_SIZE(calld->pending_batches)]; - size_t num_batches = 0; + grpc_core::CallCombinerClosureList closures; for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { - batches[num_batches++] = batch; + batch->handler_private.extra_arg = calld; + GRPC_CLOSURE_INIT(&batch->handler_private.closure, + fail_pending_batch_in_call_combiner, batch, + grpc_schedule_on_exec_ctx); + closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error), + "pending_batches_fail"); pending_batch_clear(calld, pending); } } - for (size_t i = yield_call_combiner ? 1 : 0; i < num_batches; ++i) { - grpc_transport_stream_op_batch* batch = batches[i]; - batch->handler_private.extra_arg = calld; - GRPC_CLOSURE_INIT(&batch->handler_private.closure, - fail_pending_batch_in_call_combiner, batch, - grpc_schedule_on_exec_ctx); - GRPC_CALL_COMBINER_START(calld->call_combiner, - &batch->handler_private.closure, - GRPC_ERROR_REF(error), "pending_batches_fail"); - } if (yield_call_combiner) { - if (num_batches > 0) { - // Note: This will release the call combiner. - grpc_transport_stream_op_batch_finish_with_failure( - batches[0], GRPC_ERROR_REF(error), calld->call_combiner); - } else { - GRPC_CALL_COMBINER_STOP(calld->call_combiner, "pending_batches_fail"); - } + closures.RunClosures(calld->call_combiner); + } else { + closures.RunClosuresWithoutYielding(calld->call_combiner); } GRPC_ERROR_UNREF(error); } @@ -1255,30 +1245,22 @@ static void pending_batches_resume(grpc_call_element* elem) { " pending batches on subchannel_call=%p", chand, calld, num_batches, calld->subchannel_call); } - grpc_transport_stream_op_batch* - batches[GPR_ARRAY_SIZE(calld->pending_batches)]; - size_t num_batches = 0; + grpc_core::CallCombinerClosureList closures; for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { - batches[num_batches++] = batch; + batch->handler_private.extra_arg = calld->subchannel_call; + GRPC_CLOSURE_INIT(&batch->handler_private.closure, + resume_pending_batch_in_call_combiner, batch, + grpc_schedule_on_exec_ctx); + closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE, + "pending_batches_resume"); pending_batch_clear(calld, pending); } } - for (size_t i = 1; i < num_batches; ++i) { - grpc_transport_stream_op_batch* batch = batches[i]; - batch->handler_private.extra_arg = calld->subchannel_call; - GRPC_CLOSURE_INIT(&batch->handler_private.closure, - resume_pending_batch_in_call_combiner, batch, - grpc_schedule_on_exec_ctx); - GRPC_CALL_COMBINER_START(calld->call_combiner, - &batch->handler_private.closure, GRPC_ERROR_NONE, - "pending_batches_resume"); - } - GPR_ASSERT(num_batches > 0); // Note: This will release the call combiner. - grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]); + closures.RunClosures(calld->call_combiner); } static void maybe_clear_pending_batch(grpc_call_element* elem, @@ -1293,7 +1275,10 @@ static void maybe_clear_pending_batch(grpc_call_element* elem, batch->payload->recv_initial_metadata.recv_initial_metadata_ready == nullptr) && (!batch->recv_message || - batch->payload->recv_message.recv_message_ready == nullptr)) { + batch->payload->recv_message.recv_message_ready == nullptr) && + (!batch->recv_trailing_metadata || + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready == + nullptr)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand, calld); @@ -1302,75 +1287,27 @@ static void maybe_clear_pending_batch(grpc_call_element* elem, } } -// Returns true if all ops in the pending batch have been completed. -static bool pending_batch_is_completed( - pending_batch* pending, call_data* calld, - subchannel_call_retry_state* retry_state) { - if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { - return false; - } - if (pending->batch->send_initial_metadata && - !retry_state->completed_send_initial_metadata) { - return false; - } - if (pending->batch->send_message && - retry_state->completed_send_message_count < - calld->send_messages->size()) { - return false; - } - if (pending->batch->send_trailing_metadata && - !retry_state->completed_send_trailing_metadata) { - return false; - } - if (pending->batch->recv_initial_metadata && - !retry_state->completed_recv_initial_metadata) { - return false; - } - if (pending->batch->recv_message && - retry_state->completed_recv_message_count < - retry_state->started_recv_message_count) { - return false; - } - if (pending->batch->recv_trailing_metadata && - !retry_state->completed_recv_trailing_metadata) { - return false; - } - return true; -} - -// Returns true if any op in the batch was not yet started. -static bool pending_batch_is_unstarted( - pending_batch* pending, call_data* calld, - subchannel_call_retry_state* retry_state) { - if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { - return false; - } - if (pending->batch->send_initial_metadata && - !retry_state->started_send_initial_metadata) { - return true; - } - if (pending->batch->send_message && - retry_state->started_send_message_count < calld->send_messages->size()) { - return true; - } - if (pending->batch->send_trailing_metadata && - !retry_state->started_send_trailing_metadata) { - return true; - } - if (pending->batch->recv_initial_metadata && - !retry_state->started_recv_initial_metadata) { - return true; - } - if (pending->batch->recv_message && - retry_state->completed_recv_message_count == - retry_state->started_recv_message_count) { - return true; - } - if (pending->batch->recv_trailing_metadata && - !retry_state->started_recv_trailing_metadata) { - return true; +// Returns a pointer to the first pending batch for which predicate(batch) +// returns true, or null if not found. +template <typename Predicate> +static pending_batch* pending_batch_find(grpc_call_element* elem, + const char* log_message, + Predicate predicate) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + pending_batch* pending = &calld->pending_batches[i]; + grpc_transport_stream_op_batch* batch = pending->batch; + if (batch != nullptr && predicate(batch)) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand, + calld, log_message, i); + } + return pending; + } } - return false; + return nullptr; } // @@ -1557,8 +1494,13 @@ static bool maybe_retry(grpc_call_element* elem, // subchannel_batch_data // +// Creates a subchannel_batch_data object on the call's arena with the +// specified refcount. If set_on_complete is true, the batch's +// on_complete callback will be set to point to on_complete(); +// otherwise, the batch's on_complete callback will be null. static subchannel_batch_data* batch_data_create(grpc_call_element* elem, - int refcount) { + int refcount, + bool set_on_complete) { call_data* calld = static_cast<call_data*>(elem->call_data); subchannel_call_retry_state* retry_state = static_cast<subchannel_call_retry_state*>( @@ -1571,9 +1513,11 @@ static subchannel_batch_data* batch_data_create(grpc_call_element* elem, GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create"); batch_data->batch.payload = &retry_state->batch_payload; gpr_ref_init(&batch_data->refs, refcount); - GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data, - grpc_schedule_on_exec_ctx); - batch_data->batch.on_complete = &batch_data->on_complete; + if (set_on_complete) { + GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data, + grpc_schedule_on_exec_ctx); + batch_data->batch.on_complete = &batch_data->on_complete; + } GRPC_CALL_STACK_REF(calld->owning_call, "batch_data"); return batch_data; } @@ -1606,26 +1550,14 @@ static void batch_data_unref(subchannel_batch_data* batch_data) { static void invoke_recv_initial_metadata_callback(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); - channel_data* chand = - static_cast<channel_data*>(batch_data->elem->channel_data); - call_data* calld = static_cast<call_data*>(batch_data->elem->call_data); // Find pending batch. - pending_batch* pending = nullptr; - for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { - grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch; - if (batch != nullptr && batch->recv_initial_metadata && - batch->payload->recv_initial_metadata.recv_initial_metadata_ready != - nullptr) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: invoking recv_initial_metadata_ready for " - "pending batch at index %" PRIuPTR, - chand, calld, i); - } - pending = &calld->pending_batches[i]; - break; - } - } + pending_batch* pending = pending_batch_find( + batch_data->elem, "invoking recv_initial_metadata_ready for", + [](grpc_transport_stream_op_batch* batch) { + return batch->recv_initial_metadata && + batch->payload->recv_initial_metadata + .recv_initial_metadata_ready != nullptr; + }); GPR_ASSERT(pending != nullptr); // Return metadata. grpc_metadata_batch_move( @@ -1661,10 +1593,19 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { static_cast<subchannel_call_retry_state*>( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); + retry_state->completed_recv_initial_metadata = true; + // If a retry was already dispatched, then we're not going to use the + // result of this recv_initial_metadata op, so do nothing. + if (retry_state->retry_dispatched) { + GRPC_CALL_COMBINER_STOP( + calld->call_combiner, + "recv_initial_metadata_ready after retry dispatched"); + return; + } // If we got an error or a Trailers-Only response and have not yet gotten - // the recv_trailing_metadata on_complete callback, then defer - // propagating this callback back to the surface. We can evaluate whether - // to retry when recv_trailing_metadata comes back. + // the recv_trailing_metadata_ready callback, then defer propagating this + // callback back to the surface. We can evaluate whether to retry when + // recv_trailing_metadata comes back. if (GPR_UNLIKELY((batch_data->trailing_metadata_available || error != GRPC_ERROR_NONE) && !retry_state->completed_recv_trailing_metadata)) { @@ -1689,9 +1630,9 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { } // Received valid initial metadata, so commit the call. retry_commit(elem, retry_state); + // Invoke the callback to return the result to the surface. // Manually invoking a callback function; it does not take ownership of error. invoke_recv_initial_metadata_callback(batch_data, error); - GRPC_ERROR_UNREF(error); } // @@ -1701,25 +1642,13 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { // Invokes recv_message_ready for a subchannel batch. static void invoke_recv_message_callback(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); - channel_data* chand = - static_cast<channel_data*>(batch_data->elem->channel_data); - call_data* calld = static_cast<call_data*>(batch_data->elem->call_data); // Find pending op. - pending_batch* pending = nullptr; - for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { - grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch; - if (batch != nullptr && batch->recv_message && - batch->payload->recv_message.recv_message_ready != nullptr) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: invoking recv_message_ready for " - "pending batch at index %" PRIuPTR, - chand, calld, i); - } - pending = &calld->pending_batches[i]; - break; - } - } + pending_batch* pending = pending_batch_find( + batch_data->elem, "invoking recv_message_ready for", + [](grpc_transport_stream_op_batch* batch) { + return batch->recv_message && + batch->payload->recv_message.recv_message_ready != nullptr; + }); GPR_ASSERT(pending != nullptr); // Return payload. *pending->batch->payload->recv_message.recv_message = @@ -1751,10 +1680,18 @@ static void recv_message_ready(void* arg, grpc_error* error) { static_cast<subchannel_call_retry_state*>( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); + ++retry_state->completed_recv_message_count; + // If a retry was already dispatched, then we're not going to use the + // result of this recv_message op, so do nothing. + if (retry_state->retry_dispatched) { + GRPC_CALL_COMBINER_STOP(calld->call_combiner, + "recv_message_ready after retry dispatched"); + return; + } // If we got an error or the payload was nullptr and we have not yet gotten - // the recv_trailing_metadata on_complete callback, then defer - // propagating this callback back to the surface. We can evaluate whether - // to retry when recv_trailing_metadata comes back. + // the recv_trailing_metadata_ready callback, then defer propagating this + // callback back to the surface. We can evaluate whether to retry when + // recv_trailing_metadata comes back. if (GPR_UNLIKELY( (batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) && !retry_state->completed_recv_trailing_metadata)) { @@ -1777,133 +1714,268 @@ static void recv_message_ready(void* arg, grpc_error* error) { } // Received a valid message, so commit the call. retry_commit(elem, retry_state); + // Invoke the callback to return the result to the surface. // Manually invoking a callback function; it does not take ownership of error. invoke_recv_message_callback(batch_data, error); - GRPC_ERROR_UNREF(error); } // -// list of closures to execute in call combiner +// recv_trailing_metadata handling // -// Represents a closure that needs to run in the call combiner as part of -// starting or completing a batch. -typedef struct { - grpc_closure* closure; - grpc_error* error; - const char* reason; - bool free_reason = false; -} closure_to_execute; - -static void execute_closures_in_call_combiner(grpc_call_element* elem, - const char* caller, - closure_to_execute* closures, - size_t num_closures) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); +// Sets *status and *server_pushback_md based on batch_data and error. +static void get_call_status(subchannel_batch_data* batch_data, + grpc_error* error, grpc_status_code* status, + grpc_mdelem** server_pushback_md) { + grpc_call_element* elem = batch_data->elem; call_data* calld = static_cast<call_data*>(elem->call_data); - // Note that the call combiner will be yielded for each closure that - // we schedule. We're already running in the call combiner, so one of - // the closures can be scheduled directly, but the others will - // have to re-enter the call combiner. - if (num_closures > 0) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: %s starting closure: %s", chand, - calld, caller, closures[0].reason); - } - GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error); - if (closures[0].free_reason) { - gpr_free(const_cast<char*>(closures[0].reason)); - } - for (size_t i = 1; i < num_closures; ++i) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: %s starting closure in call combiner: %s", - chand, calld, caller, closures[i].reason); - } - GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure, - closures[i].error, closures[i].reason); - if (closures[i].free_reason) { - gpr_free(const_cast<char*>(closures[i].reason)); - } - } + if (error != GRPC_ERROR_NONE) { + grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr, + nullptr); } else { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: no closures to run for %s", chand, - calld, caller); + grpc_metadata_batch* md_batch = + batch_data->batch.payload->recv_trailing_metadata + .recv_trailing_metadata; + GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); + *status = + grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md); + if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { + *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; } - GRPC_CALL_COMBINER_STOP(calld->call_combiner, "no closures to run"); } + GRPC_ERROR_UNREF(error); } -// -// on_complete callback handling -// - -// Updates retry_state to reflect the ops completed in batch_data. -static void update_retry_state_for_completed_batch( - subchannel_batch_data* batch_data, - subchannel_call_retry_state* retry_state) { - if (batch_data->batch.send_initial_metadata) { - retry_state->completed_send_initial_metadata = true; - } - if (batch_data->batch.send_message) { - ++retry_state->completed_send_message_count; - } - if (batch_data->batch.send_trailing_metadata) { - retry_state->completed_send_trailing_metadata = true; - } - if (batch_data->batch.recv_initial_metadata) { - retry_state->completed_recv_initial_metadata = true; - } - if (batch_data->batch.recv_message) { - ++retry_state->completed_recv_message_count; - } - if (batch_data->batch.recv_trailing_metadata) { - retry_state->completed_recv_trailing_metadata = true; +// Adds recv_trailing_metadata_ready closure to closures. +static void add_closure_for_recv_trailing_metadata_ready( + grpc_call_element* elem, subchannel_batch_data* batch_data, + grpc_error* error, grpc_core::CallCombinerClosureList* closures) { + // Find pending batch. + pending_batch* pending = pending_batch_find( + elem, "invoking recv_trailing_metadata for", + [](grpc_transport_stream_op_batch* batch) { + return batch->recv_trailing_metadata && + batch->payload->recv_trailing_metadata + .recv_trailing_metadata_ready != nullptr; + }); + // If we generated the recv_trailing_metadata op internally via + // start_internal_recv_trailing_metadata(), then there will be no + // pending batch. + if (pending == nullptr) { + GRPC_ERROR_UNREF(error); + return; } + // Return metadata. + grpc_metadata_batch_move( + &batch_data->recv_trailing_metadata, + pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata); + // Add closure. + closures->Add(pending->batch->payload->recv_trailing_metadata + .recv_trailing_metadata_ready, + error, "recv_trailing_metadata_ready for pending batch"); + // Update bookkeeping. + pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + nullptr; + maybe_clear_pending_batch(elem, pending); } // Adds any necessary closures for deferred recv_initial_metadata and -// recv_message callbacks to closures, updating *num_closures as needed. +// recv_message callbacks to closures. static void add_closures_for_deferred_recv_callbacks( subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state, - closure_to_execute* closures, size_t* num_closures) { + grpc_core::CallCombinerClosureList* closures) { if (batch_data->batch.recv_trailing_metadata) { // Add closure for deferred recv_initial_metadata_ready. if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch != nullptr)) { - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = GRPC_CLOSURE_INIT( - &batch_data->recv_initial_metadata_ready, - invoke_recv_initial_metadata_callback, - retry_state->recv_initial_metadata_ready_deferred_batch, - grpc_schedule_on_exec_ctx); - closure->error = retry_state->recv_initial_metadata_error; - closure->reason = "resuming recv_initial_metadata_ready"; + GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready, + invoke_recv_initial_metadata_callback, + retry_state->recv_initial_metadata_ready_deferred_batch, + grpc_schedule_on_exec_ctx); + closures->Add(&batch_data->recv_initial_metadata_ready, + retry_state->recv_initial_metadata_error, + "resuming recv_initial_metadata_ready"); retry_state->recv_initial_metadata_ready_deferred_batch = nullptr; } // Add closure for deferred recv_message_ready. if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch != nullptr)) { - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = GRPC_CLOSURE_INIT( - &batch_data->recv_message_ready, invoke_recv_message_callback, - retry_state->recv_message_ready_deferred_batch, - grpc_schedule_on_exec_ctx); - closure->error = retry_state->recv_message_error; - closure->reason = "resuming recv_message_ready"; + GRPC_CLOSURE_INIT(&batch_data->recv_message_ready, + invoke_recv_message_callback, + retry_state->recv_message_ready_deferred_batch, + grpc_schedule_on_exec_ctx); + closures->Add(&batch_data->recv_message_ready, + retry_state->recv_message_error, + "resuming recv_message_ready"); retry_state->recv_message_ready_deferred_batch = nullptr; } } } +// Returns true if any op in the batch was not yet started. +// Only looks at send ops, since recv ops are always started immediately. +static bool pending_batch_is_unstarted( + pending_batch* pending, call_data* calld, + subchannel_call_retry_state* retry_state) { + if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { + return false; + } + if (pending->batch->send_initial_metadata && + !retry_state->started_send_initial_metadata) { + return true; + } + if (pending->batch->send_message && + retry_state->started_send_message_count < calld->send_messages->size()) { + return true; + } + if (pending->batch->send_trailing_metadata && + !retry_state->started_send_trailing_metadata) { + return true; + } + return false; +} + +// For any pending batch containing an op that has not yet been started, +// adds the pending batch's completion closures to closures. +static void add_closures_to_fail_unstarted_pending_batches( + grpc_call_element* elem, subchannel_call_retry_state* retry_state, + grpc_error* error, grpc_core::CallCombinerClosureList* closures) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + pending_batch* pending = &calld->pending_batches[i]; + if (pending_batch_is_unstarted(pending, calld, retry_state)) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: failing unstarted pending batch at index " + "%" PRIuPTR, + chand, calld, i); + } + closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error), + "failing on_complete for pending batch"); + pending->batch->on_complete = nullptr; + maybe_clear_pending_batch(elem, pending); + } + } + GRPC_ERROR_UNREF(error); +} + +// Runs necessary closures upon completion of a call attempt. +static void run_closures_for_completed_call(subchannel_batch_data* batch_data, + grpc_error* error) { + grpc_call_element* elem = batch_data->elem; + call_data* calld = static_cast<call_data*>(elem->call_data); + subchannel_call_retry_state* retry_state = + static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); + // Construct list of closures to execute. + grpc_core::CallCombinerClosureList closures; + // First, add closure for recv_trailing_metadata_ready. + add_closure_for_recv_trailing_metadata_ready( + elem, batch_data, GRPC_ERROR_REF(error), &closures); + // If there are deferred recv_initial_metadata_ready or recv_message_ready + // callbacks, add them to closures. + add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures); + // Add closures to fail any pending batches that have not yet been started. + add_closures_to_fail_unstarted_pending_batches( + elem, retry_state, GRPC_ERROR_REF(error), &closures); + // Don't need batch_data anymore. + batch_data_unref(batch_data); + // Schedule all of the closures identified above. + // Note: This will release the call combiner. + closures.RunClosures(calld->call_combiner); + GRPC_ERROR_UNREF(error); +} + +// Intercepts recv_trailing_metadata_ready callback for retries. +// Commits the call and returns the trailing metadata up the stack. +static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { + subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); + grpc_call_element* elem = batch_data->elem; + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s", + chand, calld, grpc_error_string(error)); + } + subchannel_call_retry_state* retry_state = + static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); + retry_state->completed_recv_trailing_metadata = true; + // Get the call's status and check for server pushback metadata. + grpc_status_code status = GRPC_STATUS_OK; + grpc_mdelem* server_pushback_md = nullptr; + get_call_status(batch_data, GRPC_ERROR_REF(error), &status, + &server_pushback_md); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, + calld, grpc_status_code_to_string(status)); + } + // Check if we should retry. + if (maybe_retry(elem, batch_data, status, server_pushback_md)) { + // Unref batch_data for deferred recv_initial_metadata_ready or + // recv_message_ready callbacks, if any. + if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) { + batch_data_unref(batch_data); + GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error); + } + if (retry_state->recv_message_ready_deferred_batch != nullptr) { + batch_data_unref(batch_data); + GRPC_ERROR_UNREF(retry_state->recv_message_error); + } + batch_data_unref(batch_data); + return; + } + // Not retrying, so commit the call. + retry_commit(elem, retry_state); + // Run any necessary closures. + run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error)); +} + +// +// on_complete callback handling +// + +// Adds the on_complete closure for the pending batch completed in +// batch_data to closures. +static void add_closure_for_completed_pending_batch( + grpc_call_element* elem, subchannel_batch_data* batch_data, + subchannel_call_retry_state* retry_state, grpc_error* error, + grpc_core::CallCombinerClosureList* closures) { + pending_batch* pending = pending_batch_find( + elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) { + // Match the pending batch with the same set of send ops as the + // subchannel batch we've just completed. + return batch->on_complete != nullptr && + batch_data->batch.send_initial_metadata == + batch->send_initial_metadata && + batch_data->batch.send_message == batch->send_message && + batch_data->batch.send_trailing_metadata == + batch->send_trailing_metadata; + }); + // If batch_data is a replay batch, then there will be no pending + // batch to complete. + if (pending == nullptr) { + GRPC_ERROR_UNREF(error); + return; + } + // Add closure. + closures->Add(pending->batch->on_complete, error, + "on_complete for pending batch"); + pending->batch->on_complete = nullptr; + maybe_clear_pending_batch(elem, pending); +} + // If there are any cached ops to replay or pending ops to start on the // subchannel call, adds a closure to closures to invoke -// start_retriable_subchannel_batches(), updating *num_closures as needed. +// start_retriable_subchannel_batches(). static void add_closures_for_replay_or_pending_send_ops( grpc_call_element* elem, subchannel_batch_data* batch_data, - subchannel_call_retry_state* retry_state, closure_to_execute* closures, - size_t* num_closures) { + subchannel_call_retry_state* retry_state, + grpc_core::CallCombinerClosureList* closures) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); bool have_pending_send_message_ops = @@ -1929,93 +2001,12 @@ static void add_closures_for_replay_or_pending_send_ops( "chand=%p calld=%p: starting next batch for pending send op(s)", chand, calld); } - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = GRPC_CLOSURE_INIT( - &batch_data->batch.handler_private.closure, - start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx); - closure->error = GRPC_ERROR_NONE; - closure->reason = "starting next batch for send_* op(s)"; - } -} - -// For any pending batch completed in batch_data, adds the necessary -// completion closures to closures, updating *num_closures as needed. -static void add_closures_for_completed_pending_batches( - grpc_call_element* elem, subchannel_batch_data* batch_data, - subchannel_call_retry_state* retry_state, grpc_error* error, - closure_to_execute* closures, size_t* num_closures) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { - pending_batch* pending = &calld->pending_batches[i]; - if (pending_batch_is_completed(pending, calld, retry_state)) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: pending batch completed at index %" PRIuPTR, - chand, calld, i); - } - // Copy the trailing metadata to return it to the surface. - if (batch_data->batch.recv_trailing_metadata) { - grpc_metadata_batch_move(&batch_data->recv_trailing_metadata, - pending->batch->payload->recv_trailing_metadata - .recv_trailing_metadata); - } - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = pending->batch->on_complete; - closure->error = GRPC_ERROR_REF(error); - closure->reason = "on_complete for pending batch"; - pending->batch->on_complete = nullptr; - maybe_clear_pending_batch(elem, pending); - } + GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure, + start_retriable_subchannel_batches, elem, + grpc_schedule_on_exec_ctx); + closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE, + "starting next batch for send_* op(s)"); } - GRPC_ERROR_UNREF(error); -} - -// For any pending batch containing an op that has not yet been started, -// adds the pending batch's completion closures to closures, updating -// *num_closures as needed. -static void add_closures_to_fail_unstarted_pending_batches( - grpc_call_element* elem, subchannel_call_retry_state* retry_state, - grpc_error* error, closure_to_execute* closures, size_t* num_closures) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { - pending_batch* pending = &calld->pending_batches[i]; - if (pending_batch_is_unstarted(pending, calld, retry_state)) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: failing unstarted pending batch at index " - "%" PRIuPTR, - chand, calld, i); - } - if (pending->batch->recv_initial_metadata) { - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = pending->batch->payload->recv_initial_metadata - .recv_initial_metadata_ready; - closure->error = GRPC_ERROR_REF(error); - closure->reason = - "failing recv_initial_metadata_ready for pending batch"; - pending->batch->payload->recv_initial_metadata - .recv_initial_metadata_ready = nullptr; - } - if (pending->batch->recv_message) { - *pending->batch->payload->recv_message.recv_message = nullptr; - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = - pending->batch->payload->recv_message.recv_message_ready; - closure->error = GRPC_ERROR_REF(error); - closure->reason = "failing recv_message_ready for pending batch"; - pending->batch->payload->recv_message.recv_message_ready = nullptr; - } - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = pending->batch->on_complete; - closure->error = GRPC_ERROR_REF(error); - closure->reason = "failing on_complete for pending batch"; - pending->batch->on_complete = nullptr; - maybe_clear_pending_batch(elem, pending); - } - } - GRPC_ERROR_UNREF(error); } // Callback used to intercept on_complete from subchannel calls. @@ -2035,136 +2026,49 @@ static void on_complete(void* arg, grpc_error* error) { static_cast<subchannel_call_retry_state*>( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); - // If we have previously completed recv_trailing_metadata, then the - // call is finished. - bool call_finished = retry_state->completed_recv_trailing_metadata; - // Record whether we were already committed before receiving this callback. - const bool previously_committed = calld->retry_committed; // Update bookkeeping in retry_state. - update_retry_state_for_completed_batch(batch_data, retry_state); - if (call_finished) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: call already finished", chand, - calld); - } - } else { - // Check if this batch finished the call, and if so, get its status. - // The call is finished if either (a) this callback was invoked with - // an error or (b) we receive status. - grpc_status_code status = GRPC_STATUS_OK; - grpc_mdelem* server_pushback_md = nullptr; - if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { // Case (a). - call_finished = true; - grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr, - nullptr); - } else if (batch_data->batch.recv_trailing_metadata) { // Case (b). - call_finished = true; - grpc_metadata_batch* md_batch = - batch_data->batch.payload->recv_trailing_metadata - .recv_trailing_metadata; - GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); - status = grpc_get_status_code_from_metadata( - md_batch->idx.named.grpc_status->md); - if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { - server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; - } - } - // If the call just finished, check if we should retry. - if (call_finished) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, - calld, grpc_status_code_to_string(status)); - } - if (maybe_retry(elem, batch_data, status, server_pushback_md)) { - // Unref batch_data for deferred recv_initial_metadata_ready or - // recv_message_ready callbacks, if any. - if (batch_data->batch.recv_trailing_metadata && - retry_state->recv_initial_metadata_ready_deferred_batch != - nullptr) { - batch_data_unref(batch_data); - GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error); - } - if (batch_data->batch.recv_trailing_metadata && - retry_state->recv_message_ready_deferred_batch != nullptr) { - batch_data_unref(batch_data); - GRPC_ERROR_UNREF(retry_state->recv_message_error); - } - // Track number of pending subchannel send batches and determine if - // this was the last one. - bool last_callback_complete = false; - if (batch_data->batch.send_initial_metadata || - batch_data->batch.send_message || - batch_data->batch.send_trailing_metadata) { - --calld->num_pending_retriable_subchannel_send_batches; - last_callback_complete = - calld->num_pending_retriable_subchannel_send_batches == 0; - } - batch_data_unref(batch_data); - // If we just completed the last subchannel send batch, unref the - // call stack. - if (last_callback_complete) { - GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches"); - } - return; - } - // Not retrying, so commit the call. - retry_commit(elem, retry_state); - } + if (batch_data->batch.send_initial_metadata) { + retry_state->completed_send_initial_metadata = true; + } + if (batch_data->batch.send_message) { + ++retry_state->completed_send_message_count; } - // If we were already committed before receiving this callback, free - // cached data for send ops that we've just completed. (If the call has - // just now finished, the call to retry_commit() above will have freed all - // cached send ops, so we don't need to do it here.) - if (previously_committed) { + if (batch_data->batch.send_trailing_metadata) { + retry_state->completed_send_trailing_metadata = true; + } + // If the call is committed, free cached data for send ops that we've just + // completed. + if (calld->retry_committed) { free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state); } - // Call not being retried. // Construct list of closures to execute. - // Max number of closures is number of pending batches plus one for - // each of: - // - recv_initial_metadata_ready (either deferred or unstarted) - // - recv_message_ready (either deferred or unstarted) - // - starting a new batch for pending send ops - closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches) + 3]; - size_t num_closures = 0; - // If there are deferred recv_initial_metadata_ready or recv_message_ready - // callbacks, add them to closures. - add_closures_for_deferred_recv_callbacks(batch_data, retry_state, closures, - &num_closures); - // Find pending batches whose ops are now complete and add their - // on_complete callbacks to closures. - add_closures_for_completed_pending_batches(elem, batch_data, retry_state, - GRPC_ERROR_REF(error), closures, - &num_closures); - // Add closures to handle any pending batches that have not yet been started. - // If the call is finished, we fail these batches; otherwise, we add a - // callback to start_retriable_subchannel_batches() to start them on - // the subchannel call. - if (call_finished) { - add_closures_to_fail_unstarted_pending_batches( - elem, retry_state, GRPC_ERROR_REF(error), closures, &num_closures); - } else { - add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state, - closures, &num_closures); + grpc_core::CallCombinerClosureList closures; + // If a retry was already dispatched, that means we saw + // recv_trailing_metadata before this, so we do nothing here. + // Otherwise, invoke the callback to return the result to the surface. + if (!retry_state->retry_dispatched) { + // Add closure for the completed pending batch, if any. + add_closure_for_completed_pending_batch(elem, batch_data, retry_state, + GRPC_ERROR_REF(error), &closures); + // If needed, add a callback to start any replay or pending send ops on + // the subchannel call. + if (!retry_state->completed_recv_trailing_metadata) { + add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state, + &closures); + } } // Track number of pending subchannel send batches and determine if this // was the last one. - bool last_callback_complete = false; - if (batch_data->batch.send_initial_metadata || - batch_data->batch.send_message || - batch_data->batch.send_trailing_metadata) { - --calld->num_pending_retriable_subchannel_send_batches; - last_callback_complete = - calld->num_pending_retriable_subchannel_send_batches == 0; - } + --calld->num_pending_retriable_subchannel_send_batches; + const bool last_send_batch_complete = + calld->num_pending_retriable_subchannel_send_batches == 0; // Don't need batch_data anymore. batch_data_unref(batch_data); // Schedule all of the closures identified above. // Note: This yeilds the call combiner. - execute_closures_in_call_combiner(elem, "on_complete", closures, - num_closures); - // If we just completed the last subchannel send batch, unref the call stack. - if (last_callback_complete) { + closures.RunClosures(calld->call_combiner); + // If this was the last subchannel send batch, unref the call stack. + if (last_send_batch_complete) { GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches"); } } @@ -2185,27 +2089,22 @@ static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) { // Adds a closure to closures that will execute batch in the call combiner. static void add_closure_for_subchannel_batch( - call_data* calld, grpc_transport_stream_op_batch* batch, - closure_to_execute* closures, size_t* num_closures) { + grpc_call_element* elem, grpc_transport_stream_op_batch* batch, + grpc_core::CallCombinerClosureList* closures) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = &batch->handler_private.closure; - closure->error = GRPC_ERROR_NONE; - // If the tracer is enabled, we log a more detailed message, which - // requires dynamic allocation. This will be freed in - // start_retriable_subchannel_batches(). if (grpc_client_channel_trace.enabled()) { char* batch_str = grpc_transport_stream_op_batch_string(batch); - gpr_asprintf(const_cast<char**>(&closure->reason), - "starting batch in call combiner: %s", batch_str); + gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand, + calld, batch_str); gpr_free(batch_str); - closure->free_reason = true; - } else { - closure->reason = "start_subchannel_batch"; } + closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE, + "start_subchannel_batch"); } // Adds retriable send_initial_metadata op to batch_data. @@ -2341,9 +2240,13 @@ static void add_retriable_recv_trailing_metadata_op( grpc_metadata_batch_init(&batch_data->recv_trailing_metadata); batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata = &batch_data->recv_trailing_metadata; - batch_data->batch.collect_stats = true; - batch_data->batch.payload->collect_stats.collect_stats = + batch_data->batch.payload->recv_trailing_metadata.collect_stats = &batch_data->collect_stats; + GRPC_CLOSURE_INIT(&batch_data->recv_trailing_metadata_ready, + recv_trailing_metadata_ready, batch_data, + grpc_schedule_on_exec_ctx); + batch_data->batch.payload->recv_trailing_metadata + .recv_trailing_metadata_ready = &batch_data->recv_trailing_metadata_ready; } // Helper function used to start a recv_trailing_metadata batch. This @@ -2364,9 +2267,11 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); // Create batch_data with 2 refs, since this batch will be unreffed twice: - // once when the subchannel batch returns, and again when we actually get - // a recv_trailing_metadata op from the surface. - subchannel_batch_data* batch_data = batch_data_create(elem, 2); + // once for the recv_trailing_metadata_ready callback when the subchannel + // batch returns, and again when we actually get a recv_trailing_metadata + // op from the surface. + subchannel_batch_data* batch_data = + batch_data_create(elem, 2, false /* set_on_complete */); add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); retry_state->recv_trailing_metadata_internal_batch = batch_data; // Note: This will release the call combiner. @@ -2391,7 +2296,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( "send_initial_metadata op", chand, calld); } - replay_batch_data = batch_data_create(elem, 1); + replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */); add_retriable_send_initial_metadata_op(calld, retry_state, replay_batch_data); } @@ -2408,7 +2313,8 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( chand, calld); } if (replay_batch_data == nullptr) { - replay_batch_data = batch_data_create(elem, 1); + replay_batch_data = + batch_data_create(elem, 1, true /* set_on_complete */); } add_retriable_send_message_op(elem, retry_state, replay_batch_data); } @@ -2427,7 +2333,8 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( chand, calld); } if (replay_batch_data == nullptr) { - replay_batch_data = batch_data_create(elem, 1); + replay_batch_data = + batch_data_create(elem, 1, true /* set_on_complete */); } add_retriable_send_trailing_metadata_op(calld, retry_state, replay_batch_data); @@ -2439,7 +2346,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( // *num_batches as needed. static void add_subchannel_batches_for_pending_batches( grpc_call_element* elem, subchannel_call_retry_state* retry_state, - closure_to_execute* closures, size_t* num_closures) { + grpc_core::CallCombinerClosureList* closures) { call_data* calld = static_cast<call_data*>(elem->call_data); for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; @@ -2495,13 +2402,11 @@ static void add_subchannel_batches_for_pending_batches( if (retry_state->completed_recv_trailing_metadata) { subchannel_batch_data* batch_data = retry_state->recv_trailing_metadata_internal_batch; - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = &batch_data->on_complete; // Batches containing recv_trailing_metadata always succeed. - closure->error = GRPC_ERROR_NONE; - closure->reason = - "re-executing on_complete for recv_trailing_metadata " - "to propagate internally triggered result"; + closures->Add( + &batch_data->recv_trailing_metadata_ready, GRPC_ERROR_NONE, + "re-executing recv_trailing_metadata_ready to propagate " + "internally triggered result"); } else { batch_data_unref(retry_state->recv_trailing_metadata_internal_batch); } @@ -2513,14 +2418,19 @@ static void add_subchannel_batches_for_pending_batches( if (calld->method_params == nullptr || calld->method_params->retry_policy() == nullptr || calld->retry_committed) { - add_closure_for_subchannel_batch(calld, batch, closures, num_closures); + add_closure_for_subchannel_batch(elem, batch, closures); pending_batch_clear(calld, pending); continue; } // Create batch with the right number of callbacks. - const int num_callbacks = - 1 + batch->recv_initial_metadata + batch->recv_message; - subchannel_batch_data* batch_data = batch_data_create(elem, num_callbacks); + const bool has_send_ops = batch->send_initial_metadata || + batch->send_message || + batch->send_trailing_metadata; + const int num_callbacks = has_send_ops + batch->recv_initial_metadata + + batch->recv_message + + batch->recv_trailing_metadata; + subchannel_batch_data* batch_data = batch_data_create( + elem, num_callbacks, has_send_ops /* set_on_complete */); // Cache send ops if needed. maybe_cache_send_ops_for_batch(calld, pending); // send_initial_metadata. @@ -2547,11 +2457,9 @@ static void add_subchannel_batches_for_pending_batches( } // recv_trailing_metadata. if (batch->recv_trailing_metadata) { - GPR_ASSERT(batch->collect_stats); add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); } - add_closure_for_subchannel_batch(calld, &batch_data->batch, closures, - num_closures); + add_closure_for_subchannel_batch(elem, &batch_data->batch, closures); // Track number of pending subchannel send batches. // If this is the first one, take a ref to the call stack. if (batch->send_initial_metadata || batch->send_message || @@ -2579,15 +2487,13 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); // Construct list of closures to execute, one for each pending batch. - // We can start up to 6 batches. - closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches)]; - size_t num_closures = 0; + grpc_core::CallCombinerClosureList closures; // Replay previously-returned send_* ops if needed. subchannel_batch_data* replay_batch_data = maybe_create_subchannel_batch_for_replay(elem, retry_state); if (replay_batch_data != nullptr) { - add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures, - &num_closures); + add_closure_for_subchannel_batch(elem, &replay_batch_data->batch, + &closures); // Track number of pending subchannel send batches. // If this is the first one, take a ref to the call stack. if (calld->num_pending_retriable_subchannel_send_batches == 0) { @@ -2596,17 +2502,16 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { ++calld->num_pending_retriable_subchannel_send_batches; } // Now add pending batches. - add_subchannel_batches_for_pending_batches(elem, retry_state, closures, - &num_closures); + add_subchannel_batches_for_pending_batches(elem, retry_state, &closures); // Start batches on subchannel call. if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting %" PRIuPTR " retriable batches on subchannel_call=%p", - chand, calld, num_closures, calld->subchannel_call); + chand, calld, closures.size(), calld->subchannel_call); } - execute_closures_in_call_combiner(elem, "start_retriable_subchannel_batches", - closures, num_closures); + // Note: This will yield the call combiner. + closures.RunClosures(calld->call_combiner); } // diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc new file mode 100644 index 0000000000..06a6e853f5 --- /dev/null +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc @@ -0,0 +1,311 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" +#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) + +#include <ares.h> +#include <string.h> +#include <sys/ioctl.h> + +#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/time.h> +#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" + +typedef struct fd_node { + /** the owner of this fd node */ + grpc_ares_ev_driver* ev_driver; + /** a closure wrapping on_readable_locked, which should be + invoked when the grpc_fd in this node becomes readable. */ + grpc_closure read_closure; + /** a closure wrapping on_writable_locked, which should be + invoked when the grpc_fd in this node becomes writable. */ + grpc_closure write_closure; + /** next fd node in the list */ + struct fd_node* next; + + /** wrapped fd that's polled by grpc's poller for the current platform */ + grpc_core::GrpcPolledFd* grpc_polled_fd; + /** if the readable closure has been registered */ + bool readable_registered; + /** if the writable closure has been registered */ + bool writable_registered; + /** if the fd has been shutdown yet from grpc iomgr perspective */ + bool already_shutdown; +} fd_node; + +struct grpc_ares_ev_driver { + /** the ares_channel owned by this event driver */ + ares_channel channel; + /** pollset set for driving the IO events of the channel */ + grpc_pollset_set* pollset_set; + /** refcount of the event driver */ + gpr_refcount refs; + + /** combiner to synchronize c-ares and I/O callbacks on */ + grpc_combiner* combiner; + /** a list of grpc_fd that this event driver is currently using. */ + fd_node* fds; + /** is this event driver currently working? */ + bool working; + /** is this event driver being shut down */ + bool shutting_down; +}; + +static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver); + +static grpc_ares_ev_driver* grpc_ares_ev_driver_ref( + grpc_ares_ev_driver* ev_driver) { + gpr_log(GPR_DEBUG, "Ref ev_driver %" PRIuPTR, (uintptr_t)ev_driver); + gpr_ref(&ev_driver->refs); + return ev_driver; +} + +static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) { + gpr_log(GPR_DEBUG, "Unref ev_driver %" PRIuPTR, (uintptr_t)ev_driver); + if (gpr_unref(&ev_driver->refs)) { + gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver); + GPR_ASSERT(ev_driver->fds == nullptr); + GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver"); + ares_destroy(ev_driver->channel); + gpr_free(ev_driver); + } +} + +static void fd_node_destroy_locked(fd_node* fdn) { + gpr_log(GPR_DEBUG, "delete fd: %s", fdn->grpc_polled_fd->GetName()); + GPR_ASSERT(!fdn->readable_registered); + GPR_ASSERT(!fdn->writable_registered); + GPR_ASSERT(fdn->already_shutdown); + grpc_core::Delete(fdn->grpc_polled_fd); + gpr_free(fdn); +} + +static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) { + if (!fdn->already_shutdown) { + fdn->already_shutdown = true; + fdn->grpc_polled_fd->ShutdownLocked( + GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason)); + } +} + +grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, + grpc_pollset_set* pollset_set, + grpc_combiner* combiner) { + *ev_driver = static_cast<grpc_ares_ev_driver*>( + gpr_malloc(sizeof(grpc_ares_ev_driver))); + ares_options opts; + memset(&opts, 0, sizeof(opts)); + opts.flags |= ARES_FLAG_STAYOPEN; + int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS); + grpc_core::ConfigureAresChannelLocked(&(*ev_driver)->channel); + gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked"); + if (status != ARES_SUCCESS) { + char* err_msg; + gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s", + ares_strerror(status)); + grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(err_msg); + gpr_free(err_msg); + gpr_free(*ev_driver); + return err; + } + (*ev_driver)->combiner = GRPC_COMBINER_REF(combiner, "ares event driver"); + gpr_ref_init(&(*ev_driver)->refs, 1); + (*ev_driver)->pollset_set = pollset_set; + (*ev_driver)->fds = nullptr; + (*ev_driver)->working = false; + (*ev_driver)->shutting_down = false; + return GRPC_ERROR_NONE; +} + +void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver) { + // We mark the event driver as being shut down. If the event driver + // is working, grpc_ares_notify_on_event_locked will shut down the + // fds; if it's not working, there are no fds to shut down. + ev_driver->shutting_down = true; + grpc_ares_ev_driver_unref(ev_driver); +} + +void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) { + ev_driver->shutting_down = true; + fd_node* fn = ev_driver->fds; + while (fn != nullptr) { + fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown"); + fn = fn->next; + } +} + +// Search fd in the fd_node list head. This is an O(n) search, the max possible +// value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests. +static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) { + fd_node dummy_head; + dummy_head.next = *head; + fd_node* node = &dummy_head; + while (node->next != nullptr) { + if (node->next->grpc_polled_fd->GetWrappedAresSocketLocked() == as) { + fd_node* ret = node->next; + node->next = node->next->next; + *head = dummy_head.next; + return ret; + } + node = node->next; + } + return nullptr; +} + +static void on_readable_locked(void* arg, grpc_error* error) { + fd_node* fdn = static_cast<fd_node*>(arg); + grpc_ares_ev_driver* ev_driver = fdn->ev_driver; + const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); + fdn->readable_registered = false; + gpr_log(GPR_DEBUG, "readable on %s", fdn->grpc_polled_fd->GetName()); + if (error == GRPC_ERROR_NONE) { + do { + ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD); + } while (fdn->grpc_polled_fd->IsFdStillReadableLocked()); + } else { + // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or + // timed out. The pending lookups made on this ev_driver will be cancelled + // by the following ares_cancel() and the on_done callbacks will be invoked + // with a status of ARES_ECANCELLED. The remaining file descriptors in this + // ev_driver will be cleaned up in the follwing + // grpc_ares_notify_on_event_locked(). + ares_cancel(ev_driver->channel); + } + grpc_ares_notify_on_event_locked(ev_driver); + grpc_ares_ev_driver_unref(ev_driver); +} + +static void on_writable_locked(void* arg, grpc_error* error) { + fd_node* fdn = static_cast<fd_node*>(arg); + grpc_ares_ev_driver* ev_driver = fdn->ev_driver; + const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); + fdn->writable_registered = false; + gpr_log(GPR_DEBUG, "writable on %s", fdn->grpc_polled_fd->GetName()); + if (error == GRPC_ERROR_NONE) { + ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, as); + } else { + // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or + // timed out. The pending lookups made on this ev_driver will be cancelled + // by the following ares_cancel() and the on_done callbacks will be invoked + // with a status of ARES_ECANCELLED. The remaining file descriptors in this + // ev_driver will be cleaned up in the follwing + // grpc_ares_notify_on_event_locked(). + ares_cancel(ev_driver->channel); + } + grpc_ares_notify_on_event_locked(ev_driver); + grpc_ares_ev_driver_unref(ev_driver); +} + +ares_channel* grpc_ares_ev_driver_get_channel_locked( + grpc_ares_ev_driver* ev_driver) { + return &ev_driver->channel; +} + +// Get the file descriptors used by the ev_driver's ares channel, register +// driver_closure with these filedescriptors. +static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { + fd_node* new_list = nullptr; + if (!ev_driver->shutting_down) { + ares_socket_t socks[ARES_GETSOCK_MAXNUM]; + int socks_bitmask = + ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM); + for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) { + if (ARES_GETSOCK_READABLE(socks_bitmask, i) || + ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { + fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]); + // Create a new fd_node if sock[i] is not in the fd_node list. + if (fdn == nullptr) { + fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node))); + fdn->grpc_polled_fd = grpc_core::NewGrpcPolledFdLocked( + socks[i], ev_driver->pollset_set); + gpr_log(GPR_DEBUG, "new fd: %s", fdn->grpc_polled_fd->GetName()); + fdn->ev_driver = ev_driver; + fdn->readable_registered = false; + fdn->writable_registered = false; + fdn->already_shutdown = false; + GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn, + grpc_combiner_scheduler(ev_driver->combiner)); + GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn, + grpc_combiner_scheduler(ev_driver->combiner)); + } + fdn->next = new_list; + new_list = fdn; + // Register read_closure if the socket is readable and read_closure has + // not been registered with this socket. + if (ARES_GETSOCK_READABLE(socks_bitmask, i) && + !fdn->readable_registered) { + grpc_ares_ev_driver_ref(ev_driver); + gpr_log(GPR_DEBUG, "notify read on: %s", + fdn->grpc_polled_fd->GetName()); + fdn->grpc_polled_fd->RegisterForOnReadableLocked(&fdn->read_closure); + fdn->readable_registered = true; + } + // Register write_closure if the socket is writable and write_closure + // has not been registered with this socket. + if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && + !fdn->writable_registered) { + gpr_log(GPR_DEBUG, "notify write on: %s", + fdn->grpc_polled_fd->GetName()); + grpc_ares_ev_driver_ref(ev_driver); + fdn->grpc_polled_fd->RegisterForOnWriteableLocked( + &fdn->write_closure); + fdn->writable_registered = true; + } + } + } + } + // Any remaining fds in ev_driver->fds were not returned by ares_getsock() and + // are therefore no longer in use, so they can be shut down and removed from + // the list. + while (ev_driver->fds != nullptr) { + fd_node* cur = ev_driver->fds; + ev_driver->fds = ev_driver->fds->next; + fd_node_shutdown_locked(cur, "c-ares fd shutdown"); + if (!cur->readable_registered && !cur->writable_registered) { + fd_node_destroy_locked(cur); + } else { + cur->next = new_list; + new_list = cur; + } + } + ev_driver->fds = new_list; + // If the ev driver has no working fd, all the tasks are done. + if (new_list == nullptr) { + ev_driver->working = false; + gpr_log(GPR_DEBUG, "ev driver stop working"); + } +} + +void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) { + if (!ev_driver->working) { + ev_driver->working = true; + grpc_ares_notify_on_event_locked(ev_driver); + } +} + +#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h index 27d1511d94..7002c8f95f 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -22,6 +22,7 @@ #include <grpc/support/port_platform.h> #include <ares.h> +#include "src/core/lib/gprpp/abstract.h" #include "src/core/lib/iomgr/pollset_set.h" typedef struct grpc_ares_ev_driver grpc_ares_ev_driver; @@ -51,5 +52,40 @@ void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver); /* Shutdown all the grpc_fds used by \a ev_driver */ void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver); +namespace grpc_core { + +/* A wrapped fd that integrates with the grpc iomgr of the current platform. + * A GrpcPolledFd knows how to create grpc platform-specific iomgr endpoints + * from "ares_socket_t" sockets, and then sign up for readability/writeability + * with that poller, and do shutdown and destruction. */ +class GrpcPolledFd { + public: + virtual ~GrpcPolledFd() {} + /* Called when c-ares library is interested and there's no pending callback */ + virtual void RegisterForOnReadableLocked(grpc_closure* read_closure) + GRPC_ABSTRACT; + /* Called when c-ares library is interested and there's no pending callback */ + virtual void RegisterForOnWriteableLocked(grpc_closure* write_closure) + GRPC_ABSTRACT; + /* Indicates if there is data left even after just being read from */ + virtual bool IsFdStillReadableLocked() GRPC_ABSTRACT; + /* Called once and only once. Must cause cancellation of any pending + * read/write callbacks. */ + virtual void ShutdownLocked(grpc_error* error) GRPC_ABSTRACT; + /* Get the underlying ares_socket_t that this was created from */ + virtual ares_socket_t GetWrappedAresSocketLocked() GRPC_ABSTRACT; + /* A unique name, for logging */ + virtual const char* GetName() GRPC_ABSTRACT; + + GRPC_ABSTRACT_BASE_CLASS +}; + +/* Creates a new wrapped fd for the current platform */ +GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as, + grpc_pollset_set* driver_pollset_set); +void ConfigureAresChannelLocked(ares_channel* channel); + +} // namespace grpc_core + #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H \ */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index b73e979e9f..5db832baf8 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -36,286 +36,60 @@ #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/sockaddr_utils.h" -typedef struct fd_node { - /** the owner of this fd node */ - grpc_ares_ev_driver* ev_driver; - /** a closure wrapping on_readable_locked, which should be - invoked when the grpc_fd in this node becomes readable. */ - grpc_closure read_closure; - /** a closure wrapping on_writable_locked, which should be - invoked when the grpc_fd in this node becomes writable. */ - grpc_closure write_closure; - /** next fd node in the list */ - struct fd_node* next; - - /** the grpc_fd owned by this fd node */ - grpc_fd* fd; - /** if the readable closure has been registered */ - bool readable_registered; - /** if the writable closure has been registered */ - bool writable_registered; - /** if the fd has been shutdown yet from grpc iomgr perspective */ - bool already_shutdown; -} fd_node; - -struct grpc_ares_ev_driver { - /** the ares_channel owned by this event driver */ - ares_channel channel; - /** pollset set for driving the IO events of the channel */ - grpc_pollset_set* pollset_set; - /** refcount of the event driver */ - gpr_refcount refs; - - /** combiner to synchronize c-ares and I/O callbacks on */ - grpc_combiner* combiner; - /** a list of grpc_fd that this event driver is currently using. */ - fd_node* fds; - /** is this event driver currently working? */ - bool working; - /** is this event driver being shut down */ - bool shutting_down; -}; - -static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver); - -static grpc_ares_ev_driver* grpc_ares_ev_driver_ref( - grpc_ares_ev_driver* ev_driver) { - gpr_log(GPR_DEBUG, "Ref ev_driver %" PRIuPTR, (uintptr_t)ev_driver); - gpr_ref(&ev_driver->refs); - return ev_driver; -} - -static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) { - gpr_log(GPR_DEBUG, "Unref ev_driver %" PRIuPTR, (uintptr_t)ev_driver); - if (gpr_unref(&ev_driver->refs)) { - gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver); - GPR_ASSERT(ev_driver->fds == nullptr); - GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver"); - ares_destroy(ev_driver->channel); - gpr_free(ev_driver); +namespace grpc_core { + +class GrpcPolledFdPosix : public GrpcPolledFd { + public: + GrpcPolledFdPosix(ares_socket_t as, grpc_pollset_set* driver_pollset_set) + : as_(as) { + gpr_asprintf(&name_, "c-ares fd: %d", (int)as); + fd_ = grpc_fd_create((int)as, name_, false); + grpc_pollset_set_add_fd(driver_pollset_set, fd_); } -} - -static void fd_node_destroy_locked(fd_node* fdn) { - gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd)); - GPR_ASSERT(!fdn->readable_registered); - GPR_ASSERT(!fdn->writable_registered); - GPR_ASSERT(fdn->already_shutdown); - /* c-ares library will close the fd inside grpc_fd. This fd may be picked up - immediately by another thread, and should not be closed by the following - grpc_fd_orphan. */ - int dummy_release_fd; - grpc_fd_orphan(fdn->fd, nullptr, &dummy_release_fd, "c-ares query finished"); - gpr_free(fdn); -} -static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) { - if (!fdn->already_shutdown) { - fdn->already_shutdown = true; - grpc_fd_shutdown(fdn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason)); + ~GrpcPolledFdPosix() { + gpr_free(name_); + /* c-ares library will close the fd inside grpc_fd. This fd may be picked up + immediately by another thread, and should not be closed by the following + grpc_fd_orphan. */ + int dummy_release_fd; + grpc_fd_orphan(fd_, nullptr, &dummy_release_fd, "c-ares query finished"); } -} -grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, - grpc_pollset_set* pollset_set, - grpc_combiner* combiner) { - *ev_driver = static_cast<grpc_ares_ev_driver*>( - gpr_malloc(sizeof(grpc_ares_ev_driver))); - ares_options opts; - memset(&opts, 0, sizeof(opts)); - opts.flags |= ARES_FLAG_STAYOPEN; - int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS); - gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked"); - if (status != ARES_SUCCESS) { - char* err_msg; - gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s", - ares_strerror(status)); - grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(err_msg); - gpr_free(err_msg); - gpr_free(*ev_driver); - return err; + void RegisterForOnReadableLocked(grpc_closure* read_closure) override { + grpc_fd_notify_on_read(fd_, read_closure); } - (*ev_driver)->combiner = GRPC_COMBINER_REF(combiner, "ares event driver"); - gpr_ref_init(&(*ev_driver)->refs, 1); - (*ev_driver)->pollset_set = pollset_set; - (*ev_driver)->fds = nullptr; - (*ev_driver)->working = false; - (*ev_driver)->shutting_down = false; - return GRPC_ERROR_NONE; -} -void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver) { - // We mark the event driver as being shut down. If the event driver - // is working, grpc_ares_notify_on_event_locked will shut down the - // fds; if it's not working, there are no fds to shut down. - ev_driver->shutting_down = true; - grpc_ares_ev_driver_unref(ev_driver); -} + void RegisterForOnWriteableLocked(grpc_closure* write_closure) override { + grpc_fd_notify_on_write(fd_, write_closure); + } -void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) { - ev_driver->shutting_down = true; - fd_node* fn = ev_driver->fds; - while (fn != nullptr) { - fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown"); - fn = fn->next; + bool IsFdStillReadableLocked() override { + size_t bytes_available = 0; + return ioctl(grpc_fd_wrapped_fd(fd_), FIONREAD, &bytes_available) == 0 && + bytes_available > 0; } -} -// Search fd in the fd_node list head. This is an O(n) search, the max possible -// value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests. -static fd_node* pop_fd_node_locked(fd_node** head, int fd) { - fd_node dummy_head; - dummy_head.next = *head; - fd_node* node = &dummy_head; - while (node->next != nullptr) { - if (grpc_fd_wrapped_fd(node->next->fd) == fd) { - fd_node* ret = node->next; - node->next = node->next->next; - *head = dummy_head.next; - return ret; - } - node = node->next; + void ShutdownLocked(grpc_error* error) override { + grpc_fd_shutdown(fd_, error); } - return nullptr; -} -/* Check if \a fd is still readable */ -static bool grpc_ares_is_fd_still_readable_locked( - grpc_ares_ev_driver* ev_driver, int fd) { - size_t bytes_available = 0; - return ioctl(fd, FIONREAD, &bytes_available) == 0 && bytes_available > 0; -} + ares_socket_t GetWrappedAresSocketLocked() override { return as_; } -static void on_readable_locked(void* arg, grpc_error* error) { - fd_node* fdn = static_cast<fd_node*>(arg); - grpc_ares_ev_driver* ev_driver = fdn->ev_driver; - const int fd = grpc_fd_wrapped_fd(fdn->fd); - fdn->readable_registered = false; - gpr_log(GPR_DEBUG, "readable on %d", fd); - if (error == GRPC_ERROR_NONE) { - do { - ares_process_fd(ev_driver->channel, fd, ARES_SOCKET_BAD); - } while (grpc_ares_is_fd_still_readable_locked(ev_driver, fd)); - } else { - // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or - // timed out. The pending lookups made on this ev_driver will be cancelled - // by the following ares_cancel() and the on_done callbacks will be invoked - // with a status of ARES_ECANCELLED. The remaining file descriptors in this - // ev_driver will be cleaned up in the follwing - // grpc_ares_notify_on_event_locked(). - ares_cancel(ev_driver->channel); - } - grpc_ares_notify_on_event_locked(ev_driver); - grpc_ares_ev_driver_unref(ev_driver); -} + const char* GetName() override { return name_; } -static void on_writable_locked(void* arg, grpc_error* error) { - fd_node* fdn = static_cast<fd_node*>(arg); - grpc_ares_ev_driver* ev_driver = fdn->ev_driver; - const int fd = grpc_fd_wrapped_fd(fdn->fd); - fdn->writable_registered = false; - gpr_log(GPR_DEBUG, "writable on %d", fd); - if (error == GRPC_ERROR_NONE) { - ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd); - } else { - // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or - // timed out. The pending lookups made on this ev_driver will be cancelled - // by the following ares_cancel() and the on_done callbacks will be invoked - // with a status of ARES_ECANCELLED. The remaining file descriptors in this - // ev_driver will be cleaned up in the follwing - // grpc_ares_notify_on_event_locked(). - ares_cancel(ev_driver->channel); - } - grpc_ares_notify_on_event_locked(ev_driver); - grpc_ares_ev_driver_unref(ev_driver); -} + char* name_; + ares_socket_t as_; + grpc_fd* fd_; +}; -ares_channel* grpc_ares_ev_driver_get_channel_locked( - grpc_ares_ev_driver* ev_driver) { - return &ev_driver->channel; +GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as, + grpc_pollset_set* driver_pollset_set) { + return grpc_core::New<GrpcPolledFdPosix>(as, driver_pollset_set); } -// Get the file descriptors used by the ev_driver's ares channel, register -// driver_closure with these filedescriptors. -static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { - fd_node* new_list = nullptr; - if (!ev_driver->shutting_down) { - ares_socket_t socks[ARES_GETSOCK_MAXNUM]; - int socks_bitmask = - ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM); - for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) { - if (ARES_GETSOCK_READABLE(socks_bitmask, i) || - ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { - fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]); - // Create a new fd_node if sock[i] is not in the fd_node list. - if (fdn == nullptr) { - char* fd_name; - gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i); - fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node))); - gpr_log(GPR_DEBUG, "new fd: %d", socks[i]); - fdn->fd = grpc_fd_create(socks[i], fd_name, false); - fdn->ev_driver = ev_driver; - fdn->readable_registered = false; - fdn->writable_registered = false; - fdn->already_shutdown = false; - GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn, - grpc_combiner_scheduler(ev_driver->combiner)); - GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn, - grpc_combiner_scheduler(ev_driver->combiner)); - grpc_pollset_set_add_fd(ev_driver->pollset_set, fdn->fd); - gpr_free(fd_name); - } - fdn->next = new_list; - new_list = fdn; - // Register read_closure if the socket is readable and read_closure has - // not been registered with this socket. - if (ARES_GETSOCK_READABLE(socks_bitmask, i) && - !fdn->readable_registered) { - grpc_ares_ev_driver_ref(ev_driver); - gpr_log(GPR_DEBUG, "notify read on: %d", grpc_fd_wrapped_fd(fdn->fd)); - grpc_fd_notify_on_read(fdn->fd, &fdn->read_closure); - fdn->readable_registered = true; - } - // Register write_closure if the socket is writable and write_closure - // has not been registered with this socket. - if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && - !fdn->writable_registered) { - gpr_log(GPR_DEBUG, "notify write on: %d", - grpc_fd_wrapped_fd(fdn->fd)); - grpc_ares_ev_driver_ref(ev_driver); - grpc_fd_notify_on_write(fdn->fd, &fdn->write_closure); - fdn->writable_registered = true; - } - } - } - } - // Any remaining fds in ev_driver->fds were not returned by ares_getsock() and - // are therefore no longer in use, so they can be shut down and removed from - // the list. - while (ev_driver->fds != nullptr) { - fd_node* cur = ev_driver->fds; - ev_driver->fds = ev_driver->fds->next; - fd_node_shutdown_locked(cur, "c-ares fd shutdown"); - if (!cur->readable_registered && !cur->writable_registered) { - fd_node_destroy_locked(cur); - } else { - cur->next = new_list; - new_list = cur; - } - } - ev_driver->fds = new_list; - // If the ev driver has no working fd, all the tasks are done. - if (new_list == nullptr) { - ev_driver->working = false; - gpr_log(GPR_DEBUG, "ev driver stop working"); - } -} +void ConfigureAresChannelLocked(ares_channel* channel) {} -void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) { - if (!ev_driver->working) { - ev_driver->working = true; - grpc_ares_notify_on_event_locked(ev_driver); - } -} +} // namespace grpc_core #endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc index e0a41a3637..d23ad67ad5 100644 --- a/src/core/ext/filters/deadline/deadline_filter.cc +++ b/src/core/ext/filters/deadline/deadline_filter.cc @@ -128,21 +128,25 @@ static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) { } } -// Callback run when the call is complete. -static void on_complete(void* arg, grpc_error* error) { +// Callback run when we receive trailing metadata. +static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg); cancel_timer_if_needed(deadline_state); - // Invoke the next callback. - GRPC_CLOSURE_RUN(deadline_state->next_on_complete, GRPC_ERROR_REF(error)); + // Invoke the original callback. + GRPC_CLOSURE_RUN(deadline_state->original_recv_trailing_metadata_ready, + GRPC_ERROR_REF(error)); } -// Inject our own on_complete callback into op. -static void inject_on_complete_cb(grpc_deadline_state* deadline_state, - grpc_transport_stream_op_batch* op) { - deadline_state->next_on_complete = op->on_complete; - GRPC_CLOSURE_INIT(&deadline_state->on_complete, on_complete, deadline_state, +// Inject our own recv_trailing_metadata_ready callback into op. +static void inject_recv_trailing_metadata_ready( + grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) { + deadline_state->original_recv_trailing_metadata_ready = + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready, + recv_trailing_metadata_ready, deadline_state, grpc_schedule_on_exec_ctx); - op->on_complete = &deadline_state->on_complete; + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &deadline_state->recv_trailing_metadata_ready; } // Callback and associated state for starting the timer after call stack @@ -226,7 +230,7 @@ void grpc_deadline_state_client_start_transport_stream_op_batch( // Make sure we know when the call is complete, so that we can cancel // the timer. if (op->recv_trailing_metadata) { - inject_on_complete_cb(deadline_state, op); + inject_recv_trailing_metadata_ready(deadline_state, op); } } } @@ -322,7 +326,7 @@ static void server_start_transport_stream_op_batch( // the client never sends trailing metadata, because this is the // hook that tells us when the call is complete on the server side. if (op->recv_trailing_metadata) { - inject_on_complete_cb(&calld->base.deadline_state, op); + inject_recv_trailing_metadata_ready(&calld->base.deadline_state, op); } } // Chain to next filter. diff --git a/src/core/ext/filters/deadline/deadline_filter.h b/src/core/ext/filters/deadline/deadline_filter.h index 13207cbd6f..1d797f445a 100644 --- a/src/core/ext/filters/deadline/deadline_filter.h +++ b/src/core/ext/filters/deadline/deadline_filter.h @@ -37,12 +37,12 @@ typedef struct grpc_deadline_state { grpc_deadline_timer_state timer_state; grpc_timer timer; grpc_closure timer_callback; - // Closure to invoke when the call is complete. + // Closure to invoke when we receive trailing metadata. // We use this to cancel the timer. - grpc_closure on_complete; - // The original on_complete closure, which we chain to after our own - // closure is invoked. - grpc_closure* next_on_complete; + grpc_closure recv_trailing_metadata_ready; + // The original recv_trailing_metadata_ready closure, which we chain to + // after our own closure is invoked. + grpc_closure* original_recv_trailing_metadata_ready; } grpc_deadline_state; // diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index ae94ce47b9..1678051beb 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -55,8 +55,8 @@ struct call_data { grpc_closure recv_initial_metadata_ready; // State for handling recv_trailing_metadata ops. grpc_metadata_batch* recv_trailing_metadata; - grpc_closure* original_recv_trailing_metadata_on_complete; - grpc_closure recv_trailing_metadata_on_complete; + grpc_closure* original_recv_trailing_metadata_ready; + grpc_closure recv_trailing_metadata_ready; // State for handling send_message ops. grpc_transport_stream_op_batch* send_message_batch; size_t send_message_bytes_read; @@ -153,8 +153,7 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) { GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, error); } -static void recv_trailing_metadata_on_complete(void* user_data, - grpc_error* error) { +static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); call_data* calld = static_cast<call_data*>(elem->call_data); if (error == GRPC_ERROR_NONE) { @@ -163,7 +162,7 @@ static void recv_trailing_metadata_on_complete(void* user_data, } else { GRPC_ERROR_REF(error); } - GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_on_complete, error); + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); } static void send_message_on_complete(void* arg, grpc_error* error) { @@ -312,8 +311,10 @@ static void hc_start_transport_stream_op_batch( /* substitute our callback for the higher callback */ calld->recv_trailing_metadata = batch->payload->recv_trailing_metadata.recv_trailing_metadata; - calld->original_recv_trailing_metadata_on_complete = batch->on_complete; - batch->on_complete = &calld->recv_trailing_metadata_on_complete; + calld->original_recv_trailing_metadata_ready = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready; } grpc_error* error = GRPC_ERROR_NONE; @@ -420,8 +421,8 @@ static grpc_error* init_call_elem(grpc_call_element* elem, GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_on_complete, - recv_trailing_metadata_on_complete, elem, + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + recv_trailing_metadata_ready, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete, elem, grpc_schedule_on_exec_ctx); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index a8090d18a6..0d6b72c66e 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1149,12 +1149,10 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) { } } -/* Flag that this closure barrier wants stats to be updated before finishing */ -#define CLOSURE_BARRIER_STATS_BIT (1 << 0) /* Flag that this closure barrier may be covering a write in a pollset, and so we should not complete this closure until we can prove that the write got scheduled */ -#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 1) +#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0) /* First bit of the reference count, stored in the high order bits (with the low bits being used for flags defined above) */ #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16) @@ -1206,10 +1204,6 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t, grpc_error_add_child(closure->error_data.error, error); } if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) { - if (closure->next_data.scratch & CLOSURE_BARRIER_STATS_BIT) { - grpc_transport_move_stats(&s->stats, s->collecting_stats); - s->collecting_stats = nullptr; - } if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) || !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) { GRPC_CLOSURE_RUN(closure, closure->error_data.error); @@ -1351,9 +1345,14 @@ static void perform_stream_op_locked(void* stream_op, } grpc_closure* on_complete = op->on_complete; + // TODO(roth): This is a hack needed because we use data inside of the + // closure itself to do the barrier calculation (i.e., to ensure that + // we don't schedule the closure until all ops in the batch have been + // completed). This can go away once we move to a new C++ closure API + // that provides the ability to create a barrier closure. if (on_complete == nullptr) { - on_complete = - GRPC_CLOSURE_CREATE(do_nothing, nullptr, grpc_schedule_on_exec_ctx); + on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing, + nullptr, grpc_schedule_on_exec_ctx); } /* use final_data as a barrier until enqueue time; the inital counter is @@ -1361,12 +1360,6 @@ static void perform_stream_op_locked(void* stream_op, on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT; on_complete->error_data.error = GRPC_ERROR_NONE; - if (op->collect_stats) { - GPR_ASSERT(s->collecting_stats == nullptr); - s->collecting_stats = op_payload->collect_stats.collect_stats; - on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT; - } - if (op->cancel_stream) { GRPC_STATS_INC_HTTP2_OP_CANCEL(); grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error); @@ -1600,8 +1593,11 @@ static void perform_stream_op_locked(void* stream_op, if (op->recv_trailing_metadata) { GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA(); + GPR_ASSERT(s->collecting_stats == nullptr); + s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats; GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr); - s->recv_trailing_metadata_finished = add_closure_barrier(on_complete); + s->recv_trailing_metadata_finished = + op_payload->recv_trailing_metadata.recv_trailing_metadata_ready; s->recv_trailing_metadata = op_payload->recv_trailing_metadata.recv_trailing_metadata; s->final_metadata_requested = true; @@ -1960,11 +1956,12 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t, } if (s->read_closed && s->frame_storage.length == 0 && !pending_data && s->recv_trailing_metadata_finished != nullptr) { + grpc_transport_move_stats(&s->stats, s->collecting_stats); + s->collecting_stats = nullptr; grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1], s->recv_trailing_metadata); - grpc_chttp2_complete_closure_step( - t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE, - "recv_trailing_metadata_finished"); + null_then_run_closure(&s->recv_trailing_metadata_finished, + GRPC_ERROR_NONE); } } } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 420c2d13e1..4a252d972d 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -925,6 +925,10 @@ static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op, result = false; } /* Check if every op that was asked for is done. */ + /* TODO(muxi): We should not consider the recv ops here, since they + * have their own callbacks. We should invoke a batch's on_complete + * as soon as all of the batch's send ops are complete, even if + * there are still recv ops pending. */ else if (curr_op->send_initial_metadata && !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) { CRONET_LOG(GPR_DEBUG, "Because"); @@ -1280,12 +1284,20 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { op_can_be_run(stream_op, s, &oas->state, OP_RECV_TRAILING_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas); - if (oas->s->state.rs.trailing_metadata_valid) { + grpc_error* error = GRPC_ERROR_NONE; + if (stream_state->state_op_done[OP_CANCEL_ERROR]) { + error = GRPC_ERROR_REF(stream_state->cancel_error); + } else if (stream_state->state_op_done[OP_FAILED]) { + error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."); + } else if (oas->s->state.rs.trailing_metadata_valid) { grpc_chttp2_incoming_metadata_buffer_publish( &oas->s->state.rs.trailing_metadata, stream_op->payload->recv_trailing_metadata.recv_trailing_metadata); stream_state->rs.trailing_metadata_valid = false; } + GRPC_CLOSURE_SCHED( + stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready, + error); stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_op->cancel_stream && @@ -1398,6 +1410,11 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready, GRPC_ERROR_CANCELLED); } + if (op->recv_trailing_metadata) { + GRPC_CLOSURE_SCHED( + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready, + GRPC_ERROR_CANCELLED); + } GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED); return; } diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 2c3bff5c1e..b0ca7f8207 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -120,7 +120,6 @@ typedef struct inproc_stream { struct inproc_stream* stream_list_next; } inproc_stream; -static grpc_closure do_nothing_closure; static bool cancel_stream_locked(inproc_stream* s, grpc_error* error); static void op_state_machine(void* arg, grpc_error* error); @@ -373,6 +372,10 @@ static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error, const char* msg) { int is_sm = static_cast<int>(op == s->send_message_op); int is_stm = static_cast<int>(op == s->send_trailing_md_op); + // TODO(vjpai): We should not consider the recv ops here, since they + // have their own callbacks. We should invoke a batch's on_complete + // as soon as all of the batch's send ops are complete, even if there + // are still recv ops pending. int is_rim = static_cast<int>(op == s->recv_initial_md_op); int is_rm = static_cast<int>(op == s->recv_message_op); int is_rtm = static_cast<int>(op == s->recv_trailing_md_op); @@ -496,6 +499,11 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) { s->send_trailing_md_op = nullptr; } if (s->recv_trailing_md_op) { + INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p", + s, error); + GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata + .recv_trailing_metadata_ready, + GRPC_ERROR_REF(error)); INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p", s, error); complete_if_batch_end_locked( @@ -639,6 +647,12 @@ static void op_state_machine(void* arg, grpc_error* error) { s->trailing_md_sent = true; if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { INPROC_LOG(GPR_INFO, + "op_state_machine %p scheduling trailing-metadata-ready", s); + GRPC_CLOSURE_SCHED( + s->recv_trailing_md_op->payload->recv_trailing_metadata + .recv_trailing_metadata_ready, + GRPC_ERROR_NONE); + INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-on-complete", s); GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete, GRPC_ERROR_NONE); @@ -711,6 +725,12 @@ static void op_state_machine(void* arg, grpc_error* error) { } if (s->recv_trailing_md_op && s->t->is_client && other && other->send_message_op) { + INPROC_LOG(GPR_INFO, + "op_state_machine %p scheduling trailing-metadata-ready %p", s, + GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata + .recv_trailing_metadata_ready, + GRPC_ERROR_NONE); maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); } if (s->to_read_trailing_md_filled) { @@ -766,6 +786,10 @@ static void op_state_machine(void* arg, grpc_error* error) { INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-on-complete %p", s, new_err); + GRPC_CLOSURE_SCHED( + s->recv_trailing_md_op->payload->recv_trailing_metadata + .recv_trailing_metadata_ready, + GRPC_ERROR_REF(new_err)); GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete, GRPC_ERROR_REF(new_err)); s->recv_trailing_md_op = nullptr; @@ -859,6 +883,9 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { // couldn't complete that because we hadn't yet sent out trailing // md, now's the chance if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { + GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata + .recv_trailing_metadata_ready, + GRPC_ERROR_REF(s->cancel_self_error)); complete_if_batch_end_locked( s, s->cancel_self_error, s->recv_trailing_md_op, "cancel_stream scheduling trailing-md-on-complete"); @@ -873,6 +900,8 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { return ret; } +static void do_nothing(void* arg, grpc_error* error) {} + static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, grpc_transport_stream_op_batch* op) { INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op); @@ -892,8 +921,14 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, } grpc_error* error = GRPC_ERROR_NONE; grpc_closure* on_complete = op->on_complete; + // TODO(roth): This is a hack needed because we use data inside of the + // closure itself to do the barrier calculation (i.e., to ensure that + // we don't schedule the closure until all ops in the batch have been + // completed). This can go away once we move to a new C++ closure API + // that provides the ability to create a barrier closure. if (on_complete == nullptr) { - on_complete = &do_nothing_closure; + on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing, + nullptr, grpc_schedule_on_exec_ctx); } if (op->cancel_stream) { @@ -1026,6 +1061,15 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready, GRPC_ERROR_REF(error)); } + if (op->recv_trailing_metadata) { + INPROC_LOG( + GPR_INFO, + "perform_stream_op error %p scheduling trailing-metadata-ready %p", + s, error); + GRPC_CLOSURE_SCHED( + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready, + GRPC_ERROR_REF(error)); + } } INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s, error); @@ -1129,12 +1173,8 @@ static grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; } /******************************************************************************* * GLOBAL INIT AND DESTROY */ -static void do_nothing(void* arg, grpc_error* error) {} - void grpc_inproc_transport_init(void) { grpc_core::ExecCtx exec_ctx; - GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, nullptr, - grpc_schedule_on_exec_ctx); g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0); grpc_slice key_tmp = grpc_slice_from_static_string(":path"); diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index ddd3029402..e2ea334ded 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -51,6 +51,7 @@ typedef struct connected_channel_call_data { callback_state on_complete[6]; // Max number of pending batches. callback_state recv_initial_metadata_ready; callback_state recv_message_ready; + callback_state recv_trailing_metadata_ready; } call_data; static void run_in_call_combiner(void* arg, grpc_error* error) { @@ -111,6 +112,12 @@ static void con_start_transport_stream_op_batch( intercept_callback(calld, state, false, "recv_message_ready", &batch->payload->recv_message.recv_message_ready); } + if (batch->recv_trailing_metadata) { + callback_state* state = &calld->recv_trailing_metadata_ready; + intercept_callback( + calld, state, false, "recv_trailing_metadata_ready", + &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready); + } if (batch->cancel_stream) { // There can be more than one cancellation batch in flight at any // given time, so we can't just pick out a fixed index into @@ -121,7 +128,7 @@ static void con_start_transport_stream_op_batch( static_cast<callback_state*>(gpr_malloc(sizeof(*state))); intercept_callback(calld, state, true, "on_complete (cancel_stream)", &batch->on_complete); - } else { + } else if (batch->on_complete != nullptr) { callback_state* state = get_state_for_batch(calld, batch); intercept_callback(calld, state, false, "on_complete", &batch->on_complete); } diff --git a/src/core/lib/gprpp/inlined_vector.h b/src/core/lib/gprpp/inlined_vector.h index f36f6cb706..0d2586e507 100644 --- a/src/core/lib/gprpp/inlined_vector.h +++ b/src/core/lib/gprpp/inlined_vector.h @@ -99,6 +99,8 @@ class InlinedVector { void push_back(T&& value) { emplace_back(std::move(value)); } size_t size() const { return size_; } + bool empty() const { return size_ == 0; } + size_t capacity() const { return capacity_; } void clear() { diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h index 0ccd08ea57..641fa18082 100644 --- a/src/core/lib/iomgr/call_combiner.h +++ b/src/core/lib/iomgr/call_combiner.h @@ -26,6 +26,7 @@ #include <grpc/support/atm.h> #include "src/core/lib/gpr/mpscq.h" +#include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/iomgr/closure.h" // A simple, lock-free mechanism for serializing activity related to a @@ -109,4 +110,83 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner, void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner, grpc_error* error); +namespace grpc_core { + +// Helper for running a list of closures in a call combiner. +// +// Each callback running in the call combiner will eventually be +// returned to the surface, at which point the surface will yield the +// call combiner. So when we are running in the call combiner and have +// more than one callback to return to the surface, we need to re-enter +// the call combiner for all but one of those callbacks. +class CallCombinerClosureList { + public: + CallCombinerClosureList() {} + + // Adds a closure to the list. The closure must eventually result in + // the call combiner being yielded. + void Add(grpc_closure* closure, grpc_error* error, const char* reason) { + closures_.emplace_back(closure, error, reason); + } + + // Runs all closures in the call combiner and yields the call combiner. + // + // All but one of the closures in the list will be scheduled via + // GRPC_CALL_COMBINER_START(), and the remaining closure will be + // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in + // yielding the call combiner. If the list is empty, then the call + // combiner will be yielded immediately. + void RunClosures(grpc_call_combiner* call_combiner) { + if (closures_.empty()) { + GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule"); + return; + } + for (size_t i = 1; i < closures_.size(); ++i) { + auto& closure = closures_[i]; + GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, + closure.reason); + } + if (grpc_call_combiner_trace.enabled()) { + gpr_log(GPR_INFO, + "CallCombinerClosureList executing closure while already " + "holding call_combiner %p: closure=%p error=%s reason=%s", + call_combiner, closures_[0].closure, + grpc_error_string(closures_[0].error), closures_[0].reason); + } + // This will release the call combiner. + GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error); + closures_.clear(); + } + + // Runs all closures in the call combiner, but does NOT yield the call + // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START(). + void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) { + for (size_t i = 0; i < closures_.size(); ++i) { + auto& closure = closures_[i]; + GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, + closure.reason); + } + closures_.clear(); + } + + size_t size() const { return closures_.size(); } + + private: + struct CallCombinerClosure { + grpc_closure* closure; + grpc_error* error; + const char* reason; + + CallCombinerClosure(grpc_closure* closure, grpc_error* error, + const char* reason) + : closure(closure), error(error), reason(reason) {} + }; + + // There are generally a maximum of 6 closures to run in the call + // combiner, one for each pending op. + InlinedVector<CallCombinerClosure, 6> closures_; +}; + +} // namespace grpc_core + #endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */ diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 34a494485d..f14c723844 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -283,9 +283,10 @@ inline void grpc_closure_sched(grpc_closure* c, grpc_error* error) { if (c->scheduled) { gpr_log(GPR_ERROR, "Closure already scheduled. (closure: %p, created: [%s:%d], " - "previously scheduled at: [%s: %d] run?: %s", + "previously scheduled at: [%s: %d], newly scheduled at [%s: %d], " + "run?: %s", c, c->file_created, c->line_created, c->file_initiated, - c->line_initiated, c->run ? "true" : "false"); + c->line_initiated, file, line, c->run ? "true" : "false"); abort(); } c->scheduled = true; diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.cc b/src/core/lib/security/credentials/google_default/google_default_credentials.cc index 38c9175717..c456ffaf5d 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.cc +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.cc @@ -231,7 +231,8 @@ end: creds->base.vtable = &google_default_credentials_vtable; creds->base.type = GRPC_CHANNEL_CREDENTIALS_TYPE_GOOGLE_DEFAULT; gpr_ref_init(&creds->base.refcount, 1); - creds->ssl_creds = grpc_ssl_credentials_create(nullptr, nullptr, nullptr); + creds->ssl_creds = + grpc_ssl_credentials_create(nullptr, nullptr, nullptr, nullptr); GPR_ASSERT(creds->ssl_creds != nullptr); grpc_alts_credentials_options* options = grpc_alts_credentials_client_options_create(); diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.cc b/src/core/lib/security/credentials/ssl/ssl_credentials.cc index 2b6377d3ec..3d6f2f200a 100644 --- a/src/core/lib/security/credentials/ssl/ssl_credentials.cc +++ b/src/core/lib/security/credentials/ssl/ssl_credentials.cc @@ -48,6 +48,10 @@ static void ssl_destruct(grpc_channel_credentials* creds) { grpc_ssl_credentials* c = reinterpret_cast<grpc_ssl_credentials*>(creds); gpr_free(c->config.pem_root_certs); grpc_tsi_ssl_pem_key_cert_pairs_destroy(c->config.pem_key_cert_pair, 1); + if (c->config.verify_options.verify_peer_destruct != nullptr) { + c->config.verify_options.verify_peer_destruct( + c->config.verify_options.verify_peer_callback_userdata); + } } static grpc_security_status ssl_create_security_connector( @@ -87,6 +91,7 @@ static grpc_channel_credentials_vtable ssl_vtable = { static void ssl_build_config(const char* pem_root_certs, grpc_ssl_pem_key_cert_pair* pem_key_cert_pair, + const verify_peer_options* verify_options, grpc_ssl_config* config) { if (pem_root_certs != nullptr) { config->pem_root_certs = gpr_strdup(pem_root_certs); @@ -101,23 +106,32 @@ static void ssl_build_config(const char* pem_root_certs, config->pem_key_cert_pair->private_key = gpr_strdup(pem_key_cert_pair->private_key); } + if (verify_options != nullptr) { + memcpy(&config->verify_options, verify_options, + sizeof(verify_peer_options)); + } else { + // Otherwise set all options to default values + memset(&config->verify_options, 0, sizeof(verify_peer_options)); + } } grpc_channel_credentials* grpc_ssl_credentials_create( const char* pem_root_certs, grpc_ssl_pem_key_cert_pair* pem_key_cert_pair, - void* reserved) { + const verify_peer_options* verify_options, void* reserved) { grpc_ssl_credentials* c = static_cast<grpc_ssl_credentials*>( gpr_zalloc(sizeof(grpc_ssl_credentials))); GRPC_API_TRACE( "grpc_ssl_credentials_create(pem_root_certs=%s, " "pem_key_cert_pair=%p, " + "verify_options=%p, " "reserved=%p)", - 3, (pem_root_certs, pem_key_cert_pair, reserved)); + 4, (pem_root_certs, pem_key_cert_pair, verify_options, reserved)); GPR_ASSERT(reserved == nullptr); c->base.type = GRPC_CHANNEL_CREDENTIALS_TYPE_SSL; c->base.vtable = &ssl_vtable; gpr_ref_init(&c->base.refcount, 1); - ssl_build_config(pem_root_certs, pem_key_cert_pair, &c->config); + ssl_build_config(pem_root_certs, pem_key_cert_pair, verify_options, + &c->config); return &c->base; } diff --git a/src/core/lib/security/security_connector/security_connector.cc b/src/core/lib/security/security_connector/security_connector.cc index b54a7643e4..cc72bb6164 100644 --- a/src/core/lib/security/security_connector/security_connector.cc +++ b/src/core/lib/security/security_connector/security_connector.cc @@ -620,6 +620,7 @@ typedef struct { tsi_ssl_client_handshaker_factory* client_handshaker_factory; char* target_name; char* overridden_target_name; + const verify_peer_options* verify_options; } grpc_ssl_channel_security_connector; typedef struct { @@ -878,11 +879,34 @@ static void ssl_channel_check_peer(grpc_security_connector* sc, tsi_peer peer, grpc_closure* on_peer_checked) { grpc_ssl_channel_security_connector* c = reinterpret_cast<grpc_ssl_channel_security_connector*>(sc); - grpc_error* error = ssl_check_peer(sc, - c->overridden_target_name != nullptr - ? c->overridden_target_name - : c->target_name, - &peer, auth_context); + const char* target_name = c->overridden_target_name != nullptr + ? c->overridden_target_name + : c->target_name; + grpc_error* error = ssl_check_peer(sc, target_name, &peer, auth_context); + if (error == GRPC_ERROR_NONE && + c->verify_options->verify_peer_callback != nullptr) { + const tsi_peer_property* p = + tsi_peer_get_property_by_name(&peer, TSI_X509_PEM_CERT_PROPERTY); + if (p == nullptr) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Cannot check peer: missing pem cert property."); + } else { + char* peer_pem = static_cast<char*>(gpr_malloc(p->value.length + 1)); + memcpy(peer_pem, p->value.data, p->value.length); + peer_pem[p->value.length] = '\0'; + int callback_status = c->verify_options->verify_peer_callback( + target_name, peer_pem, + c->verify_options->verify_peer_callback_userdata); + gpr_free(peer_pem); + if (callback_status) { + char* msg; + gpr_asprintf(&msg, "Verify peer callback returned a failure (%d)", + callback_status); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); + gpr_free(msg); + } + } + } GRPC_CLOSURE_SCHED(on_peer_checked, error); tsi_peer_destruct(&peer); } @@ -1047,6 +1071,7 @@ grpc_security_status grpc_ssl_channel_security_connector_create( if (overridden_target_name != nullptr) { c->overridden_target_name = gpr_strdup(overridden_target_name); } + c->verify_options = &config->verify_options; has_key_cert_pair = config->pem_key_cert_pair != nullptr && config->pem_key_cert_pair->private_key != nullptr && diff --git a/src/core/lib/security/security_connector/security_connector.h b/src/core/lib/security/security_connector/security_connector.h index f9723166d0..67a506b576 100644 --- a/src/core/lib/security/security_connector/security_connector.h +++ b/src/core/lib/security/security_connector/security_connector.h @@ -193,6 +193,7 @@ grpc_server_security_connector* grpc_fake_server_security_connector_create( typedef struct { tsi_ssl_pem_key_cert_pair* pem_key_cert_pair; char* pem_root_certs; + verify_peer_options verify_options; } grpc_ssl_config; /* Creates an SSL channel_security_connector. diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 2434c9b952..f94035459f 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -233,6 +233,7 @@ struct grpc_call { grpc_closure receiving_slice_ready; grpc_closure receiving_stream_ready; grpc_closure receiving_initial_metadata_ready; + grpc_closure receiving_trailing_metadata_ready; uint32_t test_only_last_message_flags; grpc_closure release_call; @@ -270,8 +271,17 @@ struct grpc_call { grpc_core::TraceFlag grpc_call_error_trace(false, "call_error"); grpc_core::TraceFlag grpc_compression_trace(false, "compression"); -#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack*)((call) + 1)) -#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call*)(call_stack)) - 1) +/* Given a size, round up to the next multiple of sizeof(void*) */ +#define ROUND_UP_TO_ALIGNMENT_SIZE(x) \ + (((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u)) + +#define CALL_STACK_FROM_CALL(call) \ + (grpc_call_stack*)((char*)(call) + \ + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call))) +#define CALL_FROM_CALL_STACK(call_stack) \ + (grpc_call*)(((char*)(call_stack)) - \ + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call))) + #define CALL_ELEM_FROM_CALL(call, idx) \ grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx) #define CALL_FROM_TOP_ELEM(top_elem) \ @@ -342,8 +352,9 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, size_t initial_size = grpc_channel_get_call_size_estimate(args->channel); GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size); gpr_arena* arena = gpr_arena_create(initial_size); - call = static_cast<grpc_call*>(gpr_arena_alloc( - arena, sizeof(grpc_call) + channel_stack->call_stack_size)); + call = static_cast<grpc_call*>( + gpr_arena_alloc(arena, ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) + + channel_stack->call_stack_size)); gpr_ref_init(&call->ext_ref, 1); call->arena = arena; grpc_call_combiner_init(&call->call_combiner); @@ -1211,7 +1222,6 @@ static void post_batch_completion(batch_control* bctl) { if (bctl->op.send_initial_metadata) { grpc_metadata_batch_destroy( - &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); } if (bctl->op.send_message) { @@ -1219,14 +1229,9 @@ static void post_batch_completion(batch_control* bctl) { } if (bctl->op.send_trailing_metadata) { grpc_metadata_batch_destroy( - &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); } if (bctl->op.recv_trailing_metadata) { - grpc_metadata_batch* md = - &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - recv_trailing_filter(call, md); - /* propagate cancellation to any interested children */ gpr_atm_rel_store(&call->received_final_op_atm, 1); parent_call* pc = get_parent_call(call); @@ -1248,7 +1253,6 @@ static void post_batch_completion(batch_control* bctl) { } gpr_mu_unlock(&pc->child_list_mu); } - if (call->is_client) { get_final_status(call, set_status_value_directly, call->final_op.client.status, @@ -1258,7 +1262,6 @@ static void post_batch_completion(batch_control* bctl) { get_final_status(call, set_cancelled_value, call->final_op.server.cancelled, nullptr, nullptr); } - grpc_core::channelz::ChannelNode* channelz_channel = grpc_channel_get_channelz_node(call->channel); if (*call->final_op.client.status != GRPC_STATUS_OK) { @@ -1266,7 +1269,6 @@ static void post_batch_completion(batch_control* bctl) { } else { channelz_channel->RecordCallSucceeded(); } - GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } @@ -1548,6 +1550,17 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { finish_batch_step(bctl); } +static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) { + batch_control* bctl = static_cast<batch_control*>(bctlp); + grpc_call* call = bctl->call; + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready"); + add_batch_error(bctl, GRPC_ERROR_REF(error), false); + grpc_metadata_batch* md = + &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; + recv_trailing_filter(call, md); + finish_batch_step(bctl); +} + static void finish_batch(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; @@ -1568,7 +1581,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, size_t i; const grpc_op* op; batch_control* bctl; - int num_completion_callbacks_needed = 1; + bool has_send_ops = false; + int num_recv_ops = 0; grpc_call_error error = GRPC_CALL_OK; grpc_transport_stream_op_batch* stream_op; grpc_transport_stream_op_batch_payload* stream_op_payload; @@ -1674,6 +1688,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, stream_op_payload->send_initial_metadata.peer_string = &call->peer_string; } + has_send_ops = true; break; } case GRPC_OP_SEND_MESSAGE: { @@ -1703,6 +1718,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, &op->data.send_message.send_message->data.raw.slice_buffer, flags); stream_op_payload->send_message.send_message.reset( call->sending_stream.get()); + has_send_ops = true; break; } case GRPC_OP_SEND_CLOSE_FROM_CLIENT: { @@ -1723,6 +1739,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->sent_final_op = true; stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; + has_send_ops = true; break; } case GRPC_OP_SEND_STATUS_FROM_SERVER: { @@ -1787,6 +1804,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, } stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; + has_send_ops = true; break; } case GRPC_OP_RECV_INITIAL_METADATA: { @@ -1814,7 +1832,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, stream_op_payload->recv_initial_metadata.peer_string = &call->peer_string; } - num_completion_callbacks_needed++; + ++num_recv_ops; break; } case GRPC_OP_RECV_MESSAGE: { @@ -1836,7 +1854,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, grpc_schedule_on_exec_ctx); stream_op_payload->recv_message.recv_message_ready = &call->receiving_stream_ready; - num_completion_callbacks_needed++; + ++num_recv_ops; break; } case GRPC_OP_RECV_STATUS_ON_CLIENT: { @@ -1862,11 +1880,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->final_op.client.error_string = op->data.recv_status_on_client.error_string; stream_op->recv_trailing_metadata = true; - stream_op->collect_stats = true; stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op_payload->collect_stats.collect_stats = + stream_op_payload->recv_trailing_metadata.collect_stats = &call->final_info.stats.transport_stream_stats; + GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready, + receiving_trailing_metadata_ready, bctl, + grpc_schedule_on_exec_ctx); + stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &call->receiving_trailing_metadata_ready; + ++num_recv_ops; break; } case GRPC_OP_RECV_CLOSE_ON_SERVER: { @@ -1887,11 +1910,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->final_op.server.cancelled = op->data.recv_close_on_server.cancelled; stream_op->recv_trailing_metadata = true; - stream_op->collect_stats = true; stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op_payload->collect_stats.collect_stats = + stream_op_payload->recv_trailing_metadata.collect_stats = &call->final_info.stats.transport_stream_stats; + GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready, + receiving_trailing_metadata_ready, bctl, + grpc_schedule_on_exec_ctx); + stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &call->receiving_trailing_metadata_ready; + ++num_recv_ops; break; } } @@ -1901,13 +1929,15 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, if (!is_notify_tag_closure) { GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); } - gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); + gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops); - GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl, - grpc_schedule_on_exec_ctx); - stream_op->on_complete = &bctl->finish_batch; - gpr_atm_rel_store(&call->any_ops_sent_atm, 1); + if (has_send_ops) { + GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl, + grpc_schedule_on_exec_ctx); + stream_op->on_complete = &bctl->finish_batch; + } + gpr_atm_rel_store(&call->any_ops_sent_atm, 1); execute_batch(call, stream_op, &bctl->start_batch); done: diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index 039d603394..cbdb77c844 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -212,21 +212,32 @@ void grpc_transport_stream_op_batch_finish_with_failure( if (batch->send_message) { batch->payload->send_message.send_message.reset(); } - if (batch->recv_message) { - GRPC_CALL_COMBINER_START( - call_combiner, batch->payload->recv_message.recv_message_ready, - GRPC_ERROR_REF(error), "failing recv_message_ready"); + if (batch->cancel_stream) { + GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error); } + // Construct a list of closures to execute. + grpc_core::CallCombinerClosureList closures; if (batch->recv_initial_metadata) { - GRPC_CALL_COMBINER_START( - call_combiner, + closures.Add( batch->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready"); } - GRPC_CLOSURE_SCHED(batch->on_complete, error); - if (batch->cancel_stream) { - GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error); + if (batch->recv_message) { + closures.Add(batch->payload->recv_message.recv_message_ready, + GRPC_ERROR_REF(error), "failing recv_message_ready"); + } + if (batch->recv_trailing_metadata) { + closures.Add( + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready, + GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready"); + } + if (batch->on_complete != nullptr) { + closures.Add(batch->on_complete, GRPC_ERROR_REF(error), + "failing on_complete"); } + // Execute closures. + closures.RunClosures(call_combiner); + GRPC_ERROR_UNREF(error); } typedef struct { diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index b2e252d939..585b9dfae9 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -122,9 +122,15 @@ typedef struct grpc_transport_stream_op_batch_payload /* Transport stream op: a set of operations to perform on a transport against a single stream */ typedef struct grpc_transport_stream_op_batch { - /** Should be enqueued when all requested operations (excluding recv_message - and recv_initial_metadata which have their own closures) in a given batch - have been completed. */ + /** Should be scheduled when all of the non-recv operations in the batch + are complete. + + The recv ops (recv_initial_metadata, recv_message, and + recv_trailing_metadata) each have their own callbacks. If a batch + contains both recv ops and non-recv ops, on_complete should be + scheduled as soon as the non-recv ops are complete, regardless of + whether or not the recv ops are complete. If a batch contains + only recv ops, on_complete can be null. */ grpc_closure* on_complete; /** Values for the stream op (fields set are determined by flags above) */ @@ -149,9 +155,6 @@ typedef struct grpc_transport_stream_op_batch { */ bool recv_trailing_metadata : 1; - /** Collect any stats into provided buffer, zero internal stat counters */ - bool collect_stats : 1; - /** Cancel this stream with the provided error */ bool cancel_stream : 1; @@ -219,11 +222,10 @@ struct grpc_transport_stream_op_batch_payload { struct { grpc_metadata_batch* recv_trailing_metadata; - } recv_trailing_metadata; - - struct { grpc_transport_stream_stats* collect_stats; - } collect_stats; + /** Should be enqueued when initial metadata is ready to be processed. */ + grpc_closure* recv_trailing_metadata_ready; + } recv_trailing_metadata; /** Forcefully close this stream. The HTTP2 semantics should be: diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc index 25ab492f3a..8c7db642a5 100644 --- a/src/core/lib/transport/transport_op_string.cc +++ b/src/core/lib/transport/transport_op_string.cc @@ -120,13 +120,6 @@ char* grpc_transport_stream_op_batch_string( gpr_strvec_add(&b, tmp); } - if (op->collect_stats) { - gpr_strvec_add(&b, gpr_strdup(" ")); - gpr_asprintf(&tmp, "COLLECT_STATS:%p", - op->payload->collect_stats.collect_stats); - gpr_strvec_add(&b, tmp); - } - out = gpr_strvec_flatten(&b, nullptr); gpr_strvec_destroy(&b); diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 00245b397d..bdb6359632 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -83,7 +83,8 @@ std::shared_ptr<ChannelCredentials> SslCredentials( grpc_channel_credentials* c_creds = grpc_ssl_credentials_create( options.pem_root_certs.empty() ? nullptr : options.pem_root_certs.c_str(), - options.pem_private_key.empty() ? nullptr : &pem_key_cert_pair, nullptr); + options.pem_private_key.empty() ? nullptr : &pem_key_cert_pair, nullptr, + nullptr); return WrapChannelCredentials(c_creds); } diff --git a/src/cpp/ext/filters/census/channel_filter.cc b/src/cpp/ext/filters/census/channel_filter.cc new file mode 100644 index 0000000000..4ac684d277 --- /dev/null +++ b/src/cpp/ext/filters/census/channel_filter.cc @@ -0,0 +1,30 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/cpp/ext/filters/census/channel_filter.h" + +namespace grpc { + +grpc_error* CensusChannelData::Init(grpc_channel_element* elem, + grpc_channel_element_args* args) { + return GRPC_ERROR_NONE; +} + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/channel_filter.h b/src/cpp/ext/filters/census/channel_filter.h new file mode 100644 index 0000000000..0b7c682681 --- /dev/null +++ b/src/cpp/ext/filters/census/channel_filter.h @@ -0,0 +1,36 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H + +#include <grpc/support/port_platform.h> + +#include "src/cpp/ext/filters/census/context.h" + +namespace grpc { + +class CensusChannelData : public ChannelData { + public: + grpc_error* Init(grpc_channel_element* elem, + grpc_channel_element_args* args) override; +}; + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H */ diff --git a/src/cpp/ext/filters/census/client_filter.cc b/src/cpp/ext/filters/census/client_filter.cc new file mode 100644 index 0000000000..293f4b1c25 --- /dev/null +++ b/src/cpp/ext/filters/census/client_filter.cc @@ -0,0 +1,163 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/cpp/ext/filters/census/client_filter.h" + +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "opencensus/stats/stats.h" +#include "src/core/lib/surface/call.h" +#include "src/cpp/ext/filters/census/grpc_plugin.h" +#include "src/cpp/ext/filters/census/measures.h" + +namespace grpc { + +constexpr uint32_t CensusClientCallData::kMaxTraceContextLen; +constexpr uint32_t CensusClientCallData::kMaxTagsLen; + +namespace { + +void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) { + if (b->idx.named.grpc_server_stats_bin != nullptr) { + ServerStatsDeserialize( + reinterpret_cast<const char*>(GRPC_SLICE_START_PTR( + GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md))), + GRPC_SLICE_LENGTH(GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md)), + elapsed_time); + grpc_metadata_batch_remove(b, b->idx.named.grpc_server_stats_bin); + } +} + +} // namespace + +void CensusClientCallData::OnDoneRecvTrailingMetadataCb(void* user_data, + grpc_error* error) { + grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data); + CensusClientCallData* calld = + reinterpret_cast<CensusClientCallData*>(elem->call_data); + GPR_ASSERT(calld != nullptr); + if (error == GRPC_ERROR_NONE) { + GPR_ASSERT(calld->recv_trailing_metadata_ != nullptr); + FilterTrailingMetadata(calld->recv_trailing_metadata_, + &calld->elapsed_time_); + } + GRPC_CLOSURE_RUN(calld->initial_on_done_recv_trailing_metadata_, + GRPC_ERROR_REF(error)); +} + +void CensusClientCallData::OnDoneRecvMessageCb(void* user_data, + grpc_error* error) { + grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data); + CensusClientCallData* calld = + reinterpret_cast<CensusClientCallData*>(elem->call_data); + CensusChannelData* channeld = + reinterpret_cast<CensusChannelData*>(elem->channel_data); + GPR_ASSERT(calld != nullptr); + GPR_ASSERT(channeld != nullptr); + // Stream messages are no longer valid after receiving trailing metadata. + if ((*calld->recv_message_) != nullptr) { + calld->recv_message_count_++; + } + GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error)); +} + +void CensusClientCallData::StartTransportStreamOpBatch( + grpc_call_element* elem, TransportStreamOpBatch* op) { + if (op->send_initial_metadata() != nullptr) { + census_context* ctxt = op->get_census_context(); + GenerateClientContext( + qualified_method_, &context_, + (ctxt == nullptr) ? nullptr : reinterpret_cast<CensusContext*>(ctxt)); + size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf_, + kMaxTraceContextLen); + if (tracing_len > 0) { + GRPC_LOG_IF_ERROR( + "census grpc_filter", + grpc_metadata_batch_add_tail( + op->send_initial_metadata()->batch(), &tracing_bin_, + grpc_mdelem_from_slices( + GRPC_MDSTR_GRPC_TRACE_BIN, + grpc_slice_from_copied_buffer(tracing_buf_, tracing_len)))); + } + grpc_slice tags = grpc_empty_slice(); + // TODO: Add in tagging serialization. + size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags); + if (encoded_tags_len > 0) { + GRPC_LOG_IF_ERROR( + "census grpc_filter", + grpc_metadata_batch_add_tail( + op->send_initial_metadata()->batch(), &stats_bin_, + grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags))); + } + } + + if (op->send_message() != nullptr) { + ++sent_message_count_; + } + if (op->recv_message() != nullptr) { + recv_message_ = op->op()->payload->recv_message.recv_message; + initial_on_done_recv_message_ = + op->op()->payload->recv_message.recv_message_ready; + op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_; + } + if (op->recv_trailing_metadata() != nullptr) { + recv_trailing_metadata_ = op->recv_trailing_metadata()->batch(); + initial_on_done_recv_trailing_metadata_ = op->on_complete(); + op->set_on_complete(&on_done_recv_trailing_metadata_); + } + // Call next op. + grpc_call_next_op(elem, op->op()); +} + +grpc_error* CensusClientCallData::Init(grpc_call_element* elem, + const grpc_call_element_args* args) { + path_ = grpc_slice_ref_internal(args->path); + start_time_ = absl::Now(); + method_ = GetMethod(&path_); + qualified_method_ = absl::StrCat("Sent.", method_); + GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_done_recv_trailing_metadata_, + OnDoneRecvTrailingMetadataCb, elem, + grpc_schedule_on_exec_ctx); + return GRPC_ERROR_NONE; +} + +void CensusClientCallData::Destroy(grpc_call_element* elem, + const grpc_call_final_info* final_info, + grpc_closure* then_call_closure) { + const uint64_t request_size = GetOutgoingDataSize(final_info); + const uint64_t response_size = GetIncomingDataSize(final_info); + double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_); + ::opencensus::stats::Record( + {{RpcClientSentBytesPerRpc(), static_cast<double>(request_size)}, + {RpcClientReceivedBytesPerRpc(), static_cast<double>(response_size)}, + {RpcClientRoundtripLatency(), latency_ms}, + {RpcClientServerLatency(), + ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time_))}, + {RpcClientSentMessagesPerRpc(), sent_message_count_}, + {RpcClientReceivedMessagesPerRpc(), recv_message_count_}}, + {{ClientMethodTagKey(), method_}, + {ClientStatusTagKey(), StatusCodeToString(final_info->final_status)}}); + grpc_slice_unref_internal(path_); + context_.EndSpan(); +} + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/client_filter.h b/src/cpp/ext/filters/census/client_filter.h new file mode 100644 index 0000000000..851022873f --- /dev/null +++ b/src/cpp/ext/filters/census/client_filter.h @@ -0,0 +1,104 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CLIENT_FILTER_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CLIENT_FILTER_H + +#include <grpc/support/port_platform.h> + +#include "absl/strings/string_view.h" +#include "absl/time/time.h" +#include "src/cpp/ext/filters/census/channel_filter.h" +#include "src/cpp/ext/filters/census/context.h" + +namespace grpc { + +// A CallData class will be created for every grpc call within a channel. It is +// used to store data and methods specific to that call. CensusClientCallData is +// thread-compatible, however typically only 1 thread should be interacting with +// a call at a time. +class CensusClientCallData : public CallData { + public: + // Maximum size of trace context is sent on the wire. + static constexpr uint32_t kMaxTraceContextLen = 64; + // Maximum size of tags that are sent on the wire. + static constexpr uint32_t kMaxTagsLen = 2048; + + CensusClientCallData() + : recv_trailing_metadata_(nullptr), + initial_on_done_recv_trailing_metadata_(nullptr), + initial_on_done_recv_message_(nullptr), + elapsed_time_(0), + recv_message_(nullptr), + recv_message_count_(0), + sent_message_count_(0) { + memset(&stats_bin_, 0, sizeof(grpc_linked_mdelem)); + memset(&tracing_bin_, 0, sizeof(grpc_linked_mdelem)); + memset(&path_, 0, sizeof(grpc_slice)); + memset(&on_done_recv_trailing_metadata_, 0, sizeof(grpc_closure)); + memset(&on_done_recv_message_, 0, sizeof(grpc_closure)); + } + + grpc_error* Init(grpc_call_element* elem, + const grpc_call_element_args* args) override; + + void Destroy(grpc_call_element* elem, const grpc_call_final_info* final_info, + grpc_closure* then_call_closure) override; + + void StartTransportStreamOpBatch(grpc_call_element* elem, + TransportStreamOpBatch* op) override; + + static void OnDoneRecvTrailingMetadataCb(void* user_data, grpc_error* error); + + static void OnDoneSendInitialMetadataCb(void* user_data, grpc_error* error); + + static void OnDoneRecvMessageCb(void* user_data, grpc_error* error); + + private: + CensusContext context_; + // Metadata elements for tracing and census stats data. + grpc_linked_mdelem stats_bin_; + grpc_linked_mdelem tracing_bin_; + // Client method. + absl::string_view method_; + std::string qualified_method_; + grpc_slice path_; + // The recv trailing metadata callbacks. + grpc_metadata_batch* recv_trailing_metadata_; + grpc_closure* initial_on_done_recv_trailing_metadata_; + grpc_closure on_done_recv_trailing_metadata_; + // recv message + grpc_closure* initial_on_done_recv_message_; + grpc_closure on_done_recv_message_; + // Start time (for measuring latency). + absl::Time start_time_; + // Server elapsed time in nanoseconds. + uint64_t elapsed_time_; + // The received message--may be null. + grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message_; + // Number of messages in this RPC. + uint64_t recv_message_count_; + uint64_t sent_message_count_; + // Buffer needed for grpc_slice to reference when adding trace context + // metatdata to outgoing message. + char tracing_buf_[kMaxTraceContextLen]; +}; + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CLIENT_FILTER_H */ diff --git a/src/cpp/ext/filters/census/context.cc b/src/cpp/ext/filters/census/context.cc new file mode 100644 index 0000000000..4b3250236d --- /dev/null +++ b/src/cpp/ext/filters/census/context.cc @@ -0,0 +1,132 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/cpp/ext/filters/census/context.h" + +namespace grpc { + +using ::opencensus::trace::Span; +using ::opencensus::trace::SpanContext; + +void GenerateServerContext(absl::string_view tracing, absl::string_view stats, + absl::string_view primary_role, + absl::string_view method, CensusContext* context) { + GrpcTraceContext trace_ctxt; + TraceContextEncoding::Decode(tracing, &trace_ctxt); + SpanContext parent_ctx = trace_ctxt.ToSpanContext(); + new (context) CensusContext(method, parent_ctx); +} + +void GenerateClientContext(absl::string_view method, CensusContext* ctxt, + CensusContext* parent_ctxt) { + if (parent_ctxt != nullptr) { + SpanContext span_ctxt = parent_ctxt->Context(); + Span span = parent_ctxt->Span(); + if (span_ctxt.IsValid()) { + new (ctxt) CensusContext(method, &span); + return; + } + } + new (ctxt) CensusContext(method); +} + +size_t TraceContextSerialize(const ::opencensus::trace::SpanContext& context, + char* tracing_buf, size_t tracing_buf_size) { + GrpcTraceContext trace_ctxt(context); + return TraceContextEncoding::Encode(trace_ctxt, tracing_buf, + tracing_buf_size); +} + +size_t StatsContextSerialize(size_t max_tags_len, grpc_slice* tags) { + // TODO: Add implementation. Waiting on stats tagging to be added. + return 0; +} + +size_t ServerStatsSerialize(uint64_t server_elapsed_time, char* buf, + size_t buf_size) { + return RpcServerStatsEncoding::Encode(server_elapsed_time, buf, buf_size); +} + +size_t ServerStatsDeserialize(const char* buf, size_t buf_size, + uint64_t* server_elapsed_time) { + return RpcServerStatsEncoding::Decode(absl::string_view(buf, buf_size), + server_elapsed_time); +} + +uint64_t GetIncomingDataSize(const grpc_call_final_info* final_info) { + return final_info->stats.transport_stream_stats.incoming.data_bytes; +} + +uint64_t GetOutgoingDataSize(const grpc_call_final_info* final_info) { + return final_info->stats.transport_stream_stats.outgoing.data_bytes; +} + +SpanContext SpanContextFromCensusContext(const census_context* ctxt) { + return reinterpret_cast<const CensusContext*>(ctxt)->Context(); +} + +Span SpanFromCensusContext(const census_context* ctxt) { + return reinterpret_cast<const CensusContext*>(ctxt)->Span(); +} + +absl::string_view StatusCodeToString(grpc_status_code code) { + switch (code) { + case GRPC_STATUS_OK: + return "OK"; + case GRPC_STATUS_CANCELLED: + return "CANCELLED"; + case GRPC_STATUS_UNKNOWN: + return "UNKNOWN"; + case GRPC_STATUS_INVALID_ARGUMENT: + return "INVALID_ARGUMENT"; + case GRPC_STATUS_DEADLINE_EXCEEDED: + return "DEADLINE_EXCEEDED"; + case GRPC_STATUS_NOT_FOUND: + return "NOT_FOUND"; + case GRPC_STATUS_ALREADY_EXISTS: + return "ALREADY_EXISTS"; + case GRPC_STATUS_PERMISSION_DENIED: + return "PERMISSION_DENIED"; + case GRPC_STATUS_UNAUTHENTICATED: + return "UNAUTHENTICATED"; + case GRPC_STATUS_RESOURCE_EXHAUSTED: + return "RESOURCE_EXHAUSTED"; + case GRPC_STATUS_FAILED_PRECONDITION: + return "FAILED_PRECONDITION"; + case GRPC_STATUS_ABORTED: + return "ABORTED"; + case GRPC_STATUS_OUT_OF_RANGE: + return "OUT_OF_RANGE"; + case GRPC_STATUS_UNIMPLEMENTED: + return "UNIMPLEMENTED"; + case GRPC_STATUS_INTERNAL: + return "INTERNAL"; + case GRPC_STATUS_UNAVAILABLE: + return "UNAVAILABLE"; + case GRPC_STATUS_DATA_LOSS: + return "DATA_LOSS"; + default: + // gRPC wants users of this enum to include a default branch so that + // adding values is not a breaking change. + return "UNKNOWN_STATUS"; + } +} + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/context.h b/src/cpp/ext/filters/census/context.h new file mode 100644 index 0000000000..1643fdd11b --- /dev/null +++ b/src/cpp/ext/filters/census/context.h @@ -0,0 +1,126 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CONTEXT_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CONTEXT_H + +#include <grpc/support/port_platform.h> + +#include <grpc/status.h> +#include "absl/memory/memory.h" +#include "absl/strings/string_view.h" +#include "absl/strings/strip.h" +#include "opencensus/trace/span.h" +#include "opencensus/trace/span_context.h" +#include "opencensus/trace/trace_params.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/cpp/common/channel_filter.h" +#include "src/cpp/ext/filters/census/rpc_encoding.h" + +// This is needed because grpc has hardcoded CensusContext with a +// forward declaration of 'struct census_context;' +struct census_context; + +namespace grpc { + +// Thread compatible. +class CensusContext { + public: + CensusContext() : span_(::opencensus::trace::Span::BlankSpan()) {} + + explicit CensusContext(absl::string_view name) + : span_(::opencensus::trace::Span::StartSpan(name)) {} + + CensusContext(absl::string_view name, const ::opencensus::trace::Span* parent) + : span_(::opencensus::trace::Span::StartSpan(name, parent)) {} + + CensusContext(absl::string_view name, + const ::opencensus::trace::SpanContext& parent_ctxt) + : span_(::opencensus::trace::Span::StartSpanWithRemoteParent( + name, parent_ctxt)) {} + + ::opencensus::trace::SpanContext Context() const { return span_.context(); } + ::opencensus::trace::Span Span() const { return span_; } + void EndSpan() { span_.End(); } + + private: + ::opencensus::trace::Span span_; +}; + +// Serializes the outgoing trace context. Field IDs are 1 byte followed by +// field data. A 1 byte version ID is always encoded first. +size_t TraceContextSerialize(const ::opencensus::trace::SpanContext& context, + char* tracing_buf, size_t tracing_buf_size); + +// Serializes the outgoing stats context. Field IDs are 1 byte followed by +// field data. A 1 byte version ID is always encoded first. Tags are directly +// serialized into the given grpc_slice. +size_t StatsContextSerialize(size_t max_tags_len, grpc_slice* tags); + +// Serialize outgoing server stats. Returns the number of bytes serialized. +size_t ServerStatsSerialize(uint64_t server_elapsed_time, char* buf, + size_t buf_size); + +// Deserialize incoming server stats. Returns the number of bytes deserialized. +size_t ServerStatsDeserialize(const char* buf, size_t buf_size, + uint64_t* server_elapsed_time); + +// Deserialize the incoming SpanContext and generate a new server context based +// on that. This new span will never be a root span. This should only be called +// with a blank CensusContext as it overwrites it. +void GenerateServerContext(absl::string_view tracing, absl::string_view stats, + absl::string_view primary_role, + absl::string_view method, CensusContext* context); + +// Creates a new client context that is by default a new root context. +// If the current context is the default context then the newly created +// span automatically becomes a root span. This should only be called with a +// blank CensusContext as it overwrites it. +void GenerateClientContext(absl::string_view method, CensusContext* ctxt, + CensusContext* parent_ctx); + +// Returns the incoming data size from the grpc call final info. +uint64_t GetIncomingDataSize(const grpc_call_final_info* final_info); + +// Returns the outgoing data size from the grpc call final info. +uint64_t GetOutgoingDataSize(const grpc_call_final_info* final_info); + +// These helper functions return the SpanContext and Span, respectively +// associated with the census_context* stored by grpc. The user will need to +// call this for manual propagation of tracing data. +::opencensus::trace::SpanContext SpanContextFromCensusContext( + const census_context* ctxt); +::opencensus::trace::Span SpanFromCensusContext(const census_context* ctxt); + +// Returns a string representation of the StatusCode enum. +absl::string_view StatusCodeToString(grpc_status_code code); + +inline absl::string_view GetMethod(const grpc_slice* path) { + if (GRPC_SLICE_IS_EMPTY(*path)) { + return ""; + } + // Check for leading '/' and trim it if present. + return absl::StripPrefix(absl::string_view(reinterpret_cast<const char*>( + GRPC_SLICE_START_PTR(*path)), + GRPC_SLICE_LENGTH(*path)), + "/"); +} + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CONTEXT_H */ diff --git a/src/core/ext/census/grpc_context.cc b/src/cpp/ext/filters/census/grpc_context.cc index 599a798dda..599a798dda 100644 --- a/src/core/ext/census/grpc_context.cc +++ b/src/cpp/ext/filters/census/grpc_context.cc diff --git a/src/cpp/ext/filters/census/grpc_plugin.cc b/src/cpp/ext/filters/census/grpc_plugin.cc new file mode 100644 index 0000000000..f978ed3bf5 --- /dev/null +++ b/src/cpp/ext/filters/census/grpc_plugin.cc @@ -0,0 +1,130 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/cpp/ext/filters/census/grpc_plugin.h" + +#include <grpcpp/server_context.h> + +#include "opencensus/trace/span.h" +#include "src/cpp/ext/filters/census/channel_filter.h" +#include "src/cpp/ext/filters/census/client_filter.h" +#include "src/cpp/ext/filters/census/measures.h" +#include "src/cpp/ext/filters/census/server_filter.h" + +namespace grpc { + +void RegisterOpenCensusPlugin() { + RegisterChannelFilter<CensusChannelData, CensusClientCallData>( + "opencensus_client", GRPC_CLIENT_CHANNEL, INT_MAX /* priority */, + nullptr /* condition function */); + RegisterChannelFilter<CensusChannelData, CensusServerCallData>( + "opencensus_server", GRPC_SERVER_CHANNEL, INT_MAX /* priority */, + nullptr /* condition function */); + + // Access measures to ensure they are initialized. Otherwise, creating a view + // before the first RPC would cause an error. + RpcClientSentBytesPerRpc(); + RpcClientReceivedBytesPerRpc(); + RpcClientRoundtripLatency(); + RpcClientServerLatency(); + RpcClientSentMessagesPerRpc(); + RpcClientReceivedMessagesPerRpc(); + + RpcServerSentBytesPerRpc(); + RpcServerReceivedBytesPerRpc(); + RpcServerServerLatency(); + RpcServerSentMessagesPerRpc(); + RpcServerReceivedMessagesPerRpc(); +} + +::opencensus::trace::Span GetSpanFromServerContext(ServerContext* context) { + return reinterpret_cast<const CensusContext*>(context->census_context()) + ->Span(); +} + +// These measure definitions should be kept in sync across opencensus +// implementations--see +// https://github.com/census-instrumentation/opencensus-java/blob/master/contrib/grpc_metrics/src/main/java/io/opencensus/contrib/grpc/metrics/RpcMeasureConstants.java. +::opencensus::stats::TagKey ClientMethodTagKey() { + static const auto method_tag_key = + ::opencensus::stats::TagKey::Register("grpc_client_method"); + return method_tag_key; +} + +::opencensus::stats::TagKey ClientStatusTagKey() { + static const auto status_tag_key = + ::opencensus::stats::TagKey::Register("grpc_client_status"); + return status_tag_key; +} + +::opencensus::stats::TagKey ServerMethodTagKey() { + static const auto method_tag_key = + ::opencensus::stats::TagKey::Register("grpc_server_method"); + return method_tag_key; +} + +::opencensus::stats::TagKey ServerStatusTagKey() { + static const auto status_tag_key = + ::opencensus::stats::TagKey::Register("grpc_server_status"); + return status_tag_key; +} + +// Client +ABSL_CONST_INIT const absl::string_view + kRpcClientSentMessagesPerRpcMeasureName = + "grpc.io/client/sent_messages_per_rpc"; + +ABSL_CONST_INIT const absl::string_view kRpcClientSentBytesPerRpcMeasureName = + "grpc.io/client/sent_bytes_per_rpc"; + +ABSL_CONST_INIT const absl::string_view + kRpcClientReceivedMessagesPerRpcMeasureName = + "grpc.io/client/received_messages_per_rpc"; + +ABSL_CONST_INIT const absl::string_view + kRpcClientReceivedBytesPerRpcMeasureName = + "grpc.io/client/received_bytes_per_rpc"; + +ABSL_CONST_INIT const absl::string_view kRpcClientRoundtripLatencyMeasureName = + "grpc.io/client/roundtrip_latency"; + +ABSL_CONST_INIT const absl::string_view kRpcClientServerLatencyMeasureName = + "grpc.io/client/server_latency"; + +// Server +ABSL_CONST_INIT const absl::string_view + kRpcServerSentMessagesPerRpcMeasureName = + "grpc.io/server/sent_messages_per_rpc"; + +ABSL_CONST_INIT const absl::string_view kRpcServerSentBytesPerRpcMeasureName = + "grpc.io/server/sent_bytes_per_rpc"; + +ABSL_CONST_INIT const absl::string_view + kRpcServerReceivedMessagesPerRpcMeasureName = + "grpc.io/server/received_messages_per_rpc"; + +ABSL_CONST_INIT const absl::string_view + kRpcServerReceivedBytesPerRpcMeasureName = + "grpc.io/server/received_bytes_per_rpc"; + +ABSL_CONST_INIT const absl::string_view kRpcServerServerLatencyMeasureName = + "grpc.io/server/server_latency"; + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/grpc_plugin.h b/src/cpp/ext/filters/census/grpc_plugin.h new file mode 100644 index 0000000000..7ff2e7a8b8 --- /dev/null +++ b/src/cpp/ext/filters/census/grpc_plugin.h @@ -0,0 +1,111 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_GRPC_PLUGIN_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_GRPC_PLUGIN_H + +#include <grpc/support/port_platform.h> + +#include "absl/strings/string_view.h" +#include "include/grpcpp/opencensus.h" +#include "opencensus/stats/stats.h" +#include "opencensus/trace/span.h" + +namespace grpc { + +class ServerContext; + +// Returns the tracing Span for the current RPC. +::opencensus::trace::Span GetSpanFromServerContext(ServerContext* context); + +// The tag keys set when recording RPC stats. +::opencensus::stats::TagKey ClientMethodTagKey(); +::opencensus::stats::TagKey ClientStatusTagKey(); +::opencensus::stats::TagKey ServerMethodTagKey(); +::opencensus::stats::TagKey ServerStatusTagKey(); + +// Names of measures used by the plugin--users can create views on these +// measures but should not record data for them. +extern const absl::string_view kRpcClientSentMessagesPerRpcMeasureName; +extern const absl::string_view kRpcClientSentBytesPerRpcMeasureName; +extern const absl::string_view kRpcClientReceivedMessagesPerRpcMeasureName; +extern const absl::string_view kRpcClientReceivedBytesPerRpcMeasureName; +extern const absl::string_view kRpcClientRoundtripLatencyMeasureName; +extern const absl::string_view kRpcClientServerLatencyMeasureName; + +extern const absl::string_view kRpcServerSentMessagesPerRpcMeasureName; +extern const absl::string_view kRpcServerSentBytesPerRpcMeasureName; +extern const absl::string_view kRpcServerReceivedMessagesPerRpcMeasureName; +extern const absl::string_view kRpcServerReceivedBytesPerRpcMeasureName; +extern const absl::string_view kRpcServerServerLatencyMeasureName; + +// Canonical gRPC view definitions. +const ::opencensus::stats::ViewDescriptor& ClientSentMessagesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& ClientSentBytesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& +ClientReceivedMessagesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& +ClientReceivedBytesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& ClientRoundtripLatencyCumulative(); +const ::opencensus::stats::ViewDescriptor& ClientServerLatencyCumulative(); +const ::opencensus::stats::ViewDescriptor& ClientCompletedRpcsCumulative(); + +const ::opencensus::stats::ViewDescriptor& ServerSentBytesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& +ServerReceivedBytesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& ServerServerLatencyCumulative(); +const ::opencensus::stats::ViewDescriptor& ServerStartedCountCumulative(); +const ::opencensus::stats::ViewDescriptor& ServerCompletedRpcsCumulative(); +const ::opencensus::stats::ViewDescriptor& ServerSentMessagesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& +ServerReceivedMessagesPerRpcCumulative(); + +const ::opencensus::stats::ViewDescriptor& ClientSentMessagesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ClientSentBytesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ClientReceivedMessagesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ClientReceivedBytesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ClientRoundtripLatencyMinute(); +const ::opencensus::stats::ViewDescriptor& ClientServerLatencyMinute(); +const ::opencensus::stats::ViewDescriptor& ClientCompletedRpcsMinute(); + +const ::opencensus::stats::ViewDescriptor& ServerSentMessagesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ServerSentBytesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ServerReceivedMessagesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ServerReceivedBytesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ServerServerLatencyMinute(); +const ::opencensus::stats::ViewDescriptor& ServerCompletedRpcsMinute(); + +const ::opencensus::stats::ViewDescriptor& ClientSentMessagesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ClientSentBytesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ClientReceivedMessagesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ClientReceivedBytesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ClientRoundtripLatencyHour(); +const ::opencensus::stats::ViewDescriptor& ClientServerLatencyHour(); +const ::opencensus::stats::ViewDescriptor& ClientCompletedRpcsHour(); + +const ::opencensus::stats::ViewDescriptor& ServerSentMessagesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ServerSentBytesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ServerReceivedMessagesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ServerReceivedBytesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ServerServerLatencyHour(); +const ::opencensus::stats::ViewDescriptor& ServerStartedCountHour(); +const ::opencensus::stats::ViewDescriptor& ServerCompletedRpcsHour(); + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_GRPC_PLUGIN_H */ diff --git a/src/cpp/ext/filters/census/measures.cc b/src/cpp/ext/filters/census/measures.cc new file mode 100644 index 0000000000..b522fae09a --- /dev/null +++ b/src/cpp/ext/filters/census/measures.cc @@ -0,0 +1,129 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/cpp/ext/filters/census/measures.h" + +#include "opencensus/stats/stats.h" +#include "src/cpp/ext/filters/census/grpc_plugin.h" + +namespace grpc { + +using ::opencensus::stats::MeasureDouble; +using ::opencensus::stats::MeasureInt64; + +// These measure definitions should be kept in sync across opencensus +// implementations--see +// https://github.com/census-instrumentation/opencensus-java/blob/master/contrib/grpc_metrics/src/main/java/io/opencensus/contrib/grpc/metrics/RpcMeasureConstants.java. + +namespace { + +// Unit constants +constexpr char kUnitBytes[] = "By"; +constexpr char kUnitMilliseconds[] = "ms"; +constexpr char kCount[] = "1"; + +} // namespace + +// Client +MeasureDouble RpcClientSentBytesPerRpc() { + static const auto measure = MeasureDouble::Register( + kRpcClientSentBytesPerRpcMeasureName, + "Total bytes sent across all request messages per RPC", kUnitBytes); + return measure; +} + +MeasureDouble RpcClientReceivedBytesPerRpc() { + static const auto measure = MeasureDouble::Register( + kRpcClientReceivedBytesPerRpcMeasureName, + "Total bytes received across all response messages per RPC", kUnitBytes); + return measure; +} + +MeasureDouble RpcClientRoundtripLatency() { + static const auto measure = MeasureDouble::Register( + kRpcClientRoundtripLatencyMeasureName, + "Time between first byte of request sent to last byte of response " + "received, or terminal error", + kUnitMilliseconds); + return measure; +} + +MeasureDouble RpcClientServerLatency() { + static const auto measure = MeasureDouble::Register( + kRpcClientServerLatencyMeasureName, + "Time between first byte of request received to last byte of response " + "sent, or terminal error (propagated from the server)", + kUnitMilliseconds); + return measure; +} + +MeasureInt64 RpcClientSentMessagesPerRpc() { + static const auto measure = + MeasureInt64::Register(kRpcClientSentMessagesPerRpcMeasureName, + "Number of messages sent per RPC", kCount); + return measure; +} + +MeasureInt64 RpcClientReceivedMessagesPerRpc() { + static const auto measure = + MeasureInt64::Register(kRpcClientReceivedMessagesPerRpcMeasureName, + "Number of messages received per RPC", kCount); + return measure; +} + +// Server +MeasureDouble RpcServerSentBytesPerRpc() { + static const auto measure = MeasureDouble::Register( + kRpcServerSentBytesPerRpcMeasureName, + "Total bytes sent across all messages per RPC", kUnitBytes); + return measure; +} + +MeasureDouble RpcServerReceivedBytesPerRpc() { + static const auto measure = MeasureDouble::Register( + kRpcServerReceivedBytesPerRpcMeasureName, + "Total bytes received across all messages per RPC", kUnitBytes); + return measure; +} + +MeasureDouble RpcServerServerLatency() { + static const auto measure = MeasureDouble::Register( + kRpcServerServerLatencyMeasureName, + "Time between first byte of request received to last byte of response " + "sent, or terminal error", + kUnitMilliseconds); + return measure; +} + +MeasureInt64 RpcServerSentMessagesPerRpc() { + static const auto measure = + MeasureInt64::Register(kRpcServerSentMessagesPerRpcMeasureName, + "Number of messages sent per RPC", kCount); + return measure; +} + +MeasureInt64 RpcServerReceivedMessagesPerRpc() { + static const auto measure = + MeasureInt64::Register(kRpcServerReceivedMessagesPerRpcMeasureName, + "Number of messages received per RPC", kCount); + return measure; +} + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/measures.h b/src/cpp/ext/filters/census/measures.h new file mode 100644 index 0000000000..8f8e72ace2 --- /dev/null +++ b/src/cpp/ext/filters/census/measures.h @@ -0,0 +1,46 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_MEASURES_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_MEASURES_H + +#include <grpc/support/port_platform.h> + +#include "opencensus/stats/stats.h" +#include "src/cpp/ext/filters/census/grpc_plugin.h" + +namespace grpc { + +::opencensus::stats::MeasureInt64 RpcClientSentMessagesPerRpc(); +::opencensus::stats::MeasureDouble RpcClientSentBytesPerRpc(); +::opencensus::stats::MeasureInt64 RpcClientReceivedMessagesPerRpc(); +::opencensus::stats::MeasureDouble RpcClientReceivedBytesPerRpc(); +::opencensus::stats::MeasureDouble RpcClientRoundtripLatency(); +::opencensus::stats::MeasureDouble RpcClientServerLatency(); +::opencensus::stats::MeasureInt64 RpcClientCompletedRpcs(); + +::opencensus::stats::MeasureInt64 RpcServerSentMessagesPerRpc(); +::opencensus::stats::MeasureDouble RpcServerSentBytesPerRpc(); +::opencensus::stats::MeasureInt64 RpcServerReceivedMessagesPerRpc(); +::opencensus::stats::MeasureDouble RpcServerReceivedBytesPerRpc(); +::opencensus::stats::MeasureDouble RpcServerServerLatency(); +::opencensus::stats::MeasureInt64 RpcServerCompletedRpcs(); + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_MEASURES_H */ diff --git a/src/cpp/ext/filters/census/rpc_encoding.cc b/src/cpp/ext/filters/census/rpc_encoding.cc new file mode 100644 index 0000000000..45a66d9dc8 --- /dev/null +++ b/src/cpp/ext/filters/census/rpc_encoding.cc @@ -0,0 +1,39 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/cpp/ext/filters/census/rpc_encoding.h" + +namespace grpc { + +constexpr size_t TraceContextEncoding::kGrpcTraceContextSize; +constexpr size_t TraceContextEncoding::kEncodeDecodeFailure; +constexpr size_t TraceContextEncoding::kVersionIdSize; +constexpr size_t TraceContextEncoding::kFieldIdSize; +constexpr size_t TraceContextEncoding::kVersionIdOffset; +constexpr size_t TraceContextEncoding::kVersionId; + +constexpr size_t RpcServerStatsEncoding::kRpcServerStatsSize; +constexpr size_t RpcServerStatsEncoding::kEncodeDecodeFailure; +constexpr size_t RpcServerStatsEncoding::kVersionIdSize; +constexpr size_t RpcServerStatsEncoding::kFieldIdSize; +constexpr size_t RpcServerStatsEncoding::kVersionIdOffset; +constexpr size_t RpcServerStatsEncoding::kVersionId; + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/rpc_encoding.h b/src/cpp/ext/filters/census/rpc_encoding.h new file mode 100644 index 0000000000..ffffa60c46 --- /dev/null +++ b/src/cpp/ext/filters/census/rpc_encoding.h @@ -0,0 +1,284 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_RPC_ENCODING_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_RPC_ENCODING_H + +#include <grpc/support/port_platform.h> + +#include <string.h> + +#include "absl/base/internal/endian.h" +#include "absl/strings/string_view.h" +#include "opencensus/trace/span_context.h" +#include "opencensus/trace/span_id.h" +#include "opencensus/trace/trace_id.h" + +namespace grpc { + +// TODO: Rename to GrpcTraceContextV0. +struct GrpcTraceContext { + GrpcTraceContext() {} + + explicit GrpcTraceContext(const ::opencensus::trace::SpanContext& ctx) { + ctx.trace_id().CopyTo(trace_id); + ctx.span_id().CopyTo(span_id); + ctx.trace_options().CopyTo(trace_options); + } + + ::opencensus::trace::SpanContext ToSpanContext() const { + return ::opencensus::trace::SpanContext( + ::opencensus::trace::TraceId(trace_id), + ::opencensus::trace::SpanId(span_id), + ::opencensus::trace::TraceOptions(trace_options)); + } + + // TODO: For performance: + // uint8_t version; + // uint8_t trace_id_field_id; + uint8_t trace_id[::opencensus::trace::TraceId::kSize]; + // uint8_t span_id_field_id; + uint8_t span_id[::opencensus::trace::SpanId::kSize]; + // uint8_t trace_options_field_id; + uint8_t trace_options[::opencensus::trace::TraceOptions::kSize]; +}; + +// TraceContextEncoding encapsulates the logic for encoding and decoding of +// trace contexts. +class TraceContextEncoding { + public: + // Size of encoded GrpcTraceContext. (16 + 8 + 1 + 4) + static constexpr size_t kGrpcTraceContextSize = 29; + // Error value. + static constexpr size_t kEncodeDecodeFailure = 0; + + // Deserializes a GrpcTraceContext from the incoming buffer. Returns the + // number of bytes deserialized from the buffer. If the incoming buffer is + // empty or the encoding version is not supported it will return 0 bytes, + // currently only version 0 is supported. If an unknown field ID is + // encountered it will return immediately without parsing the rest of the + // buffer. Inlined for performance reasons. + static size_t Decode(absl::string_view buf, GrpcTraceContext* tc) { + if (buf.empty()) { + return kEncodeDecodeFailure; + } + uint8_t version = buf[kVersionIdOffset]; + // TODO: Support other versions later. Only support version 0 for + // now. + if (version != kVersionId) { + return kEncodeDecodeFailure; + } + + size_t pos = kVersionIdSize; + while (pos < buf.size()) { + size_t bytes_read = + ParseField(absl::string_view(&buf[pos], buf.size() - pos), tc); + if (bytes_read == 0) { + break; + } else { + pos += bytes_read; + } + } + return pos; + } + + // Serializes a GrpcTraceContext into the provided buffer. Returns the number + // of bytes serialized into the buffer. If the buffer is not of sufficient + // size (it must be at least kGrpcTraceContextSize bytes) it will drop + // everything and return 0 bytes serialized. Inlined for performance reasons. + static size_t Encode(const GrpcTraceContext& tc, char* buf, size_t buf_size) { + if (buf_size < kGrpcTraceContextSize) { + return kEncodeDecodeFailure; + } + buf[kVersionIdOffset] = kVersionId; + buf[kTraceIdOffset] = kTraceIdField; + memcpy(&buf[kTraceIdOffset + 1], tc.trace_id, + opencensus::trace::TraceId::kSize); + buf[kSpanIdOffset] = kSpanIdField; + memcpy(&buf[kSpanIdOffset + 1], tc.span_id, + opencensus::trace::SpanId::kSize); + buf[kTraceOptionsOffset] = kTraceOptionsField; + memcpy(&buf[kTraceOptionsOffset + 1], tc.trace_options, + opencensus::trace::TraceOptions::kSize); + return kGrpcTraceContextSize; + } + + private: + // Parses the next field from the incoming buffer and stores the parsed value + // in a GrpcTraceContext struct. If it does not recognize the field ID it + // will return 0, otherwise it returns the number of bytes read. + static size_t ParseField(absl::string_view buf, GrpcTraceContext* tc) { + // TODO: Add support for multi-byte field IDs. + if (buf.empty()) { + return 0; + } + // Field ID is always the first byte in a field. + uint32_t field_id = buf[0]; + size_t bytes_read = kFieldIdSize; + switch (field_id) { + case kTraceIdField: + bytes_read += kTraceIdSize; + if (bytes_read > buf.size()) { + return 0; + } + memcpy(tc->trace_id, &buf[kFieldIdSize], + opencensus::trace::TraceId::kSize); + break; + case kSpanIdField: + bytes_read += kSpanIdSize; + if (bytes_read > buf.size()) { + return 0; + } + memcpy(tc->span_id, &buf[kFieldIdSize], + opencensus::trace::SpanId::kSize); + break; + case kTraceOptionsField: + bytes_read += kTraceOptionsSize; + if (bytes_read > buf.size()) { + return 0; + } + memcpy(tc->trace_options, &buf[kFieldIdSize], + opencensus::trace::TraceOptions::kSize); + break; + default: // Invalid field ID + return 0; + } + + return bytes_read; + } + + // Size of Version ID. + static constexpr size_t kVersionIdSize = 1; + // Size of Field ID. + static constexpr size_t kFieldIdSize = 1; + + // Offset and value for currently supported version ID. + static constexpr size_t kVersionIdOffset = 0; + static constexpr size_t kVersionId = 0; + + // Fixed Field ID values: + enum FieldIdValue { + kTraceIdField = 0, + kSpanIdField = 1, + kTraceOptionsField = 2, + }; + + // Field data sizes in bytes + enum FieldSize { + kTraceIdSize = 16, + kSpanIdSize = 8, + kTraceOptionsSize = 1, + }; + + // Fixed size offsets for field ID start positions during encoding. Field + // data immediately follows. + enum FieldIdOffset { + kTraceIdOffset = kVersionIdSize, + kSpanIdOffset = kTraceIdOffset + kFieldIdSize + kTraceIdSize, + kTraceOptionsOffset = kSpanIdOffset + kFieldIdSize + kSpanIdSize, + }; + + TraceContextEncoding() = delete; + TraceContextEncoding(const TraceContextEncoding&) = delete; + TraceContextEncoding(TraceContextEncoding&&) = delete; + TraceContextEncoding operator=(const TraceContextEncoding&) = delete; + TraceContextEncoding operator=(TraceContextEncoding&&) = delete; +}; + +// TODO: This may not be needed. Check to see if opencensus requires +// a trailing server response. +// RpcServerStatsEncoding encapsulates the logic for encoding and decoding of +// rpc server stats messages. Rpc server stats consists of a uint64_t time +// value (server latency in nanoseconds). +class RpcServerStatsEncoding { + public: + // Size of encoded RPC server stats. + static constexpr size_t kRpcServerStatsSize = 10; + // Error value. + static constexpr size_t kEncodeDecodeFailure = 0; + + // Deserializes rpc server stats from the incoming 'buf' into *time. Returns + // number of bytes decoded. If the buffer is of insufficient size (it must be + // at least kRpcServerStatsSize bytes) or the encoding version or field ID are + // unrecognized, *time will be set to 0 and it will return + // kEncodeDecodeFailure. Inlined for performance reasons. + static size_t Decode(absl::string_view buf, uint64_t* time) { + if (buf.size() < kRpcServerStatsSize) { + *time = 0; + return kEncodeDecodeFailure; + } + + uint8_t version = buf[kVersionIdOffset]; + uint32_t fieldID = buf[kServerElapsedTimeOffset]; + if (version != kVersionId || fieldID != kServerElapsedTimeField) { + *time = 0; + return kEncodeDecodeFailure; + } + *time = absl::little_endian::Load64( + &buf[kServerElapsedTimeOffset + kFieldIdSize]); + return kRpcServerStatsSize; + } + + // Serializes rpc server stats into the provided buffer. It returns the + // number of bytes written to the buffer. If the buffer is smaller than + // kRpcServerStatsSize bytes it will return kEncodeDecodeFailure. Inlined for + // performance reasons. + static size_t Encode(uint64_t time, char* buf, size_t buf_size) { + if (buf_size < kRpcServerStatsSize) { + return kEncodeDecodeFailure; + } + + buf[kVersionIdOffset] = kVersionId; + buf[kServerElapsedTimeOffset] = kServerElapsedTimeField; + absl::little_endian::Store64(&buf[kServerElapsedTimeOffset + kFieldIdSize], + time); + return kRpcServerStatsSize; + } + + private: + // Size of Version ID. + static constexpr size_t kVersionIdSize = 1; + // Size of Field ID. + static constexpr size_t kFieldIdSize = 1; + + // Offset and value for currently supported version ID. + static constexpr size_t kVersionIdOffset = 0; + static constexpr size_t kVersionId = 0; + + enum FieldIdValue { + kServerElapsedTimeField = 0, + }; + + enum FieldSize { + kServerElapsedTimeSize = 8, + }; + + enum FieldIdOffset { + kServerElapsedTimeOffset = kVersionIdSize, + }; + + RpcServerStatsEncoding() = delete; + RpcServerStatsEncoding(const RpcServerStatsEncoding&) = delete; + RpcServerStatsEncoding(RpcServerStatsEncoding&&) = delete; + RpcServerStatsEncoding operator=(const RpcServerStatsEncoding&) = delete; + RpcServerStatsEncoding operator=(RpcServerStatsEncoding&&) = delete; +}; + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_RPC_ENCODING_H */ diff --git a/src/cpp/ext/filters/census/server_filter.cc b/src/cpp/ext/filters/census/server_filter.cc new file mode 100644 index 0000000000..c7c62eefe5 --- /dev/null +++ b/src/cpp/ext/filters/census/server_filter.cc @@ -0,0 +1,198 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/cpp/ext/filters/census/server_filter.h" + +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" +#include "opencensus/stats/stats.h" +#include "src/core/lib/surface/call.h" +#include "src/cpp/ext/filters/census/grpc_plugin.h" +#include "src/cpp/ext/filters/census/measures.h" + +namespace grpc { + +constexpr uint32_t CensusServerCallData::kMaxServerStatsLen; + +namespace { + +// server metadata elements +struct ServerMetadataElements { + grpc_slice path; + grpc_slice tracing_slice; + grpc_slice census_proto; +}; + +void FilterInitialMetadata(grpc_metadata_batch* b, + ServerMetadataElements* sml) { + if (b->idx.named.path != nullptr) { + sml->path = grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.path->md)); + } + if (b->idx.named.grpc_trace_bin != nullptr) { + sml->tracing_slice = + grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_trace_bin->md)); + grpc_metadata_batch_remove(b, b->idx.named.grpc_trace_bin); + } + if (b->idx.named.grpc_tags_bin != nullptr) { + sml->census_proto = + grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_tags_bin->md)); + grpc_metadata_batch_remove(b, b->idx.named.grpc_tags_bin); + } +} + +} // namespace + +void CensusServerCallData::OnDoneRecvMessageCb(void* user_data, + grpc_error* error) { + grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data); + CensusServerCallData* calld = + reinterpret_cast<CensusServerCallData*>(elem->call_data); + CensusChannelData* channeld = + reinterpret_cast<CensusChannelData*>(elem->channel_data); + GPR_ASSERT(calld != nullptr); + GPR_ASSERT(channeld != nullptr); + // Stream messages are no longer valid after receiving trailing metadata. + if ((*calld->recv_message_) != nullptr) { + ++calld->recv_message_count_; + } + GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error)); +} + +void CensusServerCallData::OnDoneRecvInitialMetadataCb(void* user_data, + grpc_error* error) { + grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data); + CensusServerCallData* calld = + reinterpret_cast<CensusServerCallData*>(elem->call_data); + GPR_ASSERT(calld != nullptr); + if (error == GRPC_ERROR_NONE) { + grpc_metadata_batch* initial_metadata = calld->recv_initial_metadata_; + GPR_ASSERT(initial_metadata != nullptr); + ServerMetadataElements sml; + sml.path = grpc_empty_slice(); + sml.tracing_slice = grpc_empty_slice(); + sml.census_proto = grpc_empty_slice(); + FilterInitialMetadata(initial_metadata, &sml); + calld->path_ = grpc_slice_ref_internal(sml.path); + calld->method_ = GetMethod(&calld->path_); + calld->qualified_method_ = StrCat("Recv.", calld->method_); + const char* tracing_str = + GRPC_SLICE_IS_EMPTY(sml.tracing_slice) + ? "" + : reinterpret_cast<const char*>( + GRPC_SLICE_START_PTR(sml.tracing_slice)); + size_t tracing_str_len = GRPC_SLICE_IS_EMPTY(sml.tracing_slice) + ? 0 + : GRPC_SLICE_LENGTH(sml.tracing_slice); + const char* census_str = GRPC_SLICE_IS_EMPTY(sml.census_proto) + ? "" + : reinterpret_cast<const char*>( + GRPC_SLICE_START_PTR(sml.census_proto)); + size_t census_str_len = GRPC_SLICE_IS_EMPTY(sml.census_proto) + ? 0 + : GRPC_SLICE_LENGTH(sml.census_proto); + + GenerateServerContext(absl::string_view(tracing_str, tracing_str_len), + absl::string_view(census_str, census_str_len), + /*primary_role*/ "", calld->qualified_method_, + &calld->context_); + + grpc_slice_unref_internal(sml.tracing_slice); + grpc_slice_unref_internal(sml.census_proto); + grpc_slice_unref_internal(sml.path); + grpc_census_call_set_context( + calld->gc_, reinterpret_cast<census_context*>(&calld->context_)); + } + GRPC_CLOSURE_RUN(calld->initial_on_done_recv_initial_metadata_, + GRPC_ERROR_REF(error)); +} + +void CensusServerCallData::StartTransportStreamOpBatch( + grpc_call_element* elem, TransportStreamOpBatch* op) { + if (op->recv_initial_metadata() != nullptr) { + // substitute our callback for the op callback + recv_initial_metadata_ = op->recv_initial_metadata()->batch(); + initial_on_done_recv_initial_metadata_ = op->recv_initial_metadata_ready(); + op->set_recv_initial_metadata_ready(&on_done_recv_initial_metadata_); + } + if (op->send_message() != nullptr) { + ++sent_message_count_; + } + if (op->recv_message() != nullptr) { + recv_message_ = op->op()->payload->recv_message.recv_message; + initial_on_done_recv_message_ = + op->op()->payload->recv_message.recv_message_ready; + op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_; + } + // We need to record the time when the trailing metadata was sent to mark the + // completeness of the request. + if (op->send_trailing_metadata() != nullptr) { + elapsed_time_ = absl::Now() - start_time_; + size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_), + stats_buf_, kMaxServerStatsLen); + if (len > 0) { + GRPC_LOG_IF_ERROR( + "census grpc_filter", + grpc_metadata_batch_add_tail( + op->send_trailing_metadata()->batch(), &census_bin_, + grpc_mdelem_from_slices( + GRPC_MDSTR_GRPC_SERVER_STATS_BIN, + grpc_slice_from_copied_buffer(stats_buf_, len)))); + } + } + // Call next op. + grpc_call_next_op(elem, op->op()); +} + +grpc_error* CensusServerCallData::Init(grpc_call_element* elem, + const grpc_call_element_args* args) { + start_time_ = absl::Now(); + gc_ = + grpc_call_from_top_element(grpc_call_stack_element(args->call_stack, 0)); + GRPC_CLOSURE_INIT(&on_done_recv_initial_metadata_, + OnDoneRecvInitialMetadataCb, elem, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem, + grpc_schedule_on_exec_ctx); + auth_context_ = grpc_call_auth_context(gc_); + return GRPC_ERROR_NONE; +} + +void CensusServerCallData::Destroy(grpc_call_element* elem, + const grpc_call_final_info* final_info, + grpc_closure* then_call_closure) { + const uint64_t request_size = GetOutgoingDataSize(final_info); + const uint64_t response_size = GetIncomingDataSize(final_info); + double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_); + grpc_auth_context_release(auth_context_); + ::opencensus::stats::Record( + {{RpcServerSentBytesPerRpc(), static_cast<double>(response_size)}, + {RpcServerReceivedBytesPerRpc(), static_cast<double>(request_size)}, + {RpcServerServerLatency(), elapsed_time_ms}, + {RpcServerSentMessagesPerRpc(), sent_message_count_}, + {RpcServerReceivedMessagesPerRpc(), recv_message_count_}}, + {{ServerMethodTagKey(), method_}, + {ServerStatusTagKey(), StatusCodeToString(final_info->final_status)}}); + grpc_slice_unref_internal(path_); + context_.EndSpan(); +} + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/server_filter.h b/src/cpp/ext/filters/census/server_filter.h new file mode 100644 index 0000000000..e393ed3283 --- /dev/null +++ b/src/cpp/ext/filters/census/server_filter.h @@ -0,0 +1,101 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H + +#include <grpc/support/port_platform.h> + +#include "absl/strings/string_view.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" +#include "include/grpc/grpc_security.h" +#include "src/cpp/ext/filters/census/channel_filter.h" +#include "src/cpp/ext/filters/census/context.h" + +namespace grpc { + +// A CallData class will be created for every grpc call within a channel. It is +// used to store data and methods specific to that call. CensusServerCallData is +// thread-compatible, however typically only 1 thread should be interacting with +// a call at a time. +class CensusServerCallData : public CallData { + public: + // Maximum size of server stats that are sent on the wire. + static constexpr uint32_t kMaxServerStatsLen = 16; + + CensusServerCallData() + : gc_(nullptr), + auth_context_(nullptr), + recv_initial_metadata_(nullptr), + initial_on_done_recv_initial_metadata_(nullptr), + initial_on_done_recv_message_(nullptr), + recv_message_(nullptr), + recv_message_count_(0), + sent_message_count_(0) { + memset(&census_bin_, 0, sizeof(grpc_linked_mdelem)); + memset(&path_, 0, sizeof(grpc_slice)); + memset(&on_done_recv_initial_metadata_, 0, sizeof(grpc_closure)); + memset(&on_done_recv_message_, 0, sizeof(grpc_closure)); + } + + grpc_error* Init(grpc_call_element* elem, + const grpc_call_element_args* args) override; + + void Destroy(grpc_call_element* elem, const grpc_call_final_info* final_info, + grpc_closure* then_call_closure) override; + + void StartTransportStreamOpBatch(grpc_call_element* elem, + TransportStreamOpBatch* op) override; + + static void OnDoneRecvInitialMetadataCb(void* user_data, grpc_error* error); + + static void OnDoneRecvMessageCb(void* user_data, grpc_error* error); + + private: + CensusContext context_; + // server method + absl::string_view method_; + std::string qualified_method_; + grpc_slice path_; + // Pointer to the grpc_call element + grpc_call* gc_; + // Authorization context for the call. + grpc_auth_context* auth_context_; + // Metadata element for census stats. + grpc_linked_mdelem census_bin_; + // recv callback + grpc_metadata_batch* recv_initial_metadata_; + grpc_closure* initial_on_done_recv_initial_metadata_; + grpc_closure on_done_recv_initial_metadata_; + // recv message + grpc_closure* initial_on_done_recv_message_; + grpc_closure on_done_recv_message_; + absl::Time start_time_; + absl::Duration elapsed_time_; + grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message_; + uint64_t recv_message_count_; + uint64_t sent_message_count_; + // Buffer needed for grpc_slice to reference it when adding metatdata to + // response. + char stats_buf_[kMaxServerStatsLen]; +}; + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H */ diff --git a/src/cpp/ext/filters/census/views.cc b/src/cpp/ext/filters/census/views.cc new file mode 100644 index 0000000000..2c0c5f7295 --- /dev/null +++ b/src/cpp/ext/filters/census/views.cc @@ -0,0 +1,491 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/cpp/ext/filters/census/grpc_plugin.h" + +#include "absl/time/time.h" +#include "opencensus/stats/internal/aggregation_window.h" +#include "opencensus/stats/internal/set_aggregation_window.h" +#include "opencensus/stats/stats.h" + +namespace grpc { + +using ::opencensus::stats::Aggregation; +using ::opencensus::stats::AggregationWindow; +using ::opencensus::stats::BucketBoundaries; +using ::opencensus::stats::ViewDescriptor; + +// These measure definitions should be kept in sync across opencensus +// implementations. + +namespace { + +Aggregation BytesDistributionAggregation() { + return Aggregation::Distribution(BucketBoundaries::Explicit( + {0, 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, + 67108864, 268435456, 1073741824, 4294967296})); +} + +Aggregation MillisDistributionAggregation() { + return Aggregation::Distribution(BucketBoundaries::Explicit( + {0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, + 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, + 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, + 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000})); +} + +Aggregation CountDistributionAggregation() { + return Aggregation::Distribution(BucketBoundaries::Exponential(17, 1.0, 2.0)); +} + +ViewDescriptor MinuteDescriptor() { + auto descriptor = ViewDescriptor(); + SetAggregationWindow(AggregationWindow::Interval(absl::Minutes(1)), + &descriptor); + return descriptor; +} + +ViewDescriptor HourDescriptor() { + auto descriptor = ViewDescriptor(); + SetAggregationWindow(AggregationWindow::Interval(absl::Hours(1)), + &descriptor); + return descriptor; +} + +} // namespace + +void RegisterOpenCensusViewsForExport() { + ClientSentMessagesPerRpcCumulative().RegisterForExport(); + ClientSentBytesPerRpcCumulative().RegisterForExport(); + ClientReceivedMessagesPerRpcCumulative().RegisterForExport(); + ClientReceivedBytesPerRpcCumulative().RegisterForExport(); + ClientRoundtripLatencyCumulative().RegisterForExport(); + ClientServerLatencyCumulative().RegisterForExport(); + + ServerSentMessagesPerRpcCumulative().RegisterForExport(); + ServerSentBytesPerRpcCumulative().RegisterForExport(); + ServerReceivedMessagesPerRpcCumulative().RegisterForExport(); + ServerReceivedBytesPerRpcCumulative().RegisterForExport(); + ServerServerLatencyCumulative().RegisterForExport(); +} + +// client cumulative +const ViewDescriptor& ClientSentBytesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/sent_bytes_per_rpc/cumulative") + .set_measure(kRpcClientSentBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientReceivedBytesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/received_bytes_per_rpc/cumulative") + .set_measure(kRpcClientReceivedBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientRoundtripLatencyCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/roundtrip_latency/cumulative") + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientServerLatencyCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/server_latency/cumulative") + .set_measure(kRpcClientServerLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientCompletedRpcsCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/completed_rpcs/cumulative") + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_aggregation(Aggregation::Count()) + .add_column(ClientMethodTagKey()) + .add_column(ClientStatusTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientSentMessagesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/received_messages_per_rpc/cumulative") + .set_measure(kRpcClientSentMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientReceivedMessagesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/sent_messages_per_rpc/cumulative") + .set_measure(kRpcClientReceivedMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +// server cumulative +const ViewDescriptor& ServerSentBytesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/server/received_bytes_per_rpc/cumulative") + .set_measure(kRpcServerSentBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerReceivedBytesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/server/sent_bytes_per_rpc/cumulative") + .set_measure(kRpcServerReceivedBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerServerLatencyCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/server/elapsed_time/cumulative") + .set_measure(kRpcServerServerLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerCompletedRpcsCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/server/completed_rpcs/cumulative") + .set_measure(kRpcServerServerLatencyMeasureName) + .set_aggregation(Aggregation::Count()) + .add_column(ServerMethodTagKey()) + .add_column(ServerStatusTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerSentMessagesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/server/received_messages_per_rpc/cumulative") + .set_measure(kRpcServerSentMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerReceivedMessagesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/server/sent_messages_per_rpc/cumulative") + .set_measure(kRpcServerReceivedMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +// client minute +const ViewDescriptor& ClientSentBytesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/sent_bytes_per_rpc/minute") + .set_measure(kRpcClientSentBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientReceivedBytesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/received_bytes_per_rpc/minute") + .set_measure(kRpcClientReceivedBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientRoundtripLatencyMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/roundtrip_latency/minute") + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientServerLatencyMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/server_latency/minute") + .set_measure(kRpcClientServerLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientCompletedRpcsMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/completed_rpcs/minute") + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_aggregation(Aggregation::Count()) + .add_column(ClientMethodTagKey()) + .add_column(ClientStatusTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientSentMessagesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/sent_messages_per_rpc/minute") + .set_measure(kRpcClientSentMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientReceivedMessagesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/received_messages_per_rpc/minute") + .set_measure(kRpcClientReceivedMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +// server minute +const ViewDescriptor& ServerSentBytesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/server/sent_bytes_per_rpc/minute") + .set_measure(kRpcServerSentBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerReceivedBytesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/server/received_bytes_per_rpc/minute") + .set_measure(kRpcServerReceivedBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerServerLatencyMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/server/server_latency/minute") + .set_measure(kRpcServerServerLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerCompletedRpcsMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/server/completed_rpcs/minute") + .set_measure(kRpcServerServerLatencyMeasureName) + .set_aggregation(Aggregation::Count()) + .add_column(ServerMethodTagKey()) + .add_column(ServerStatusTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerSentMessagesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/server/sent_messages_per_rpc/minute") + .set_measure(kRpcServerSentMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerReceivedMessagesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/server/received_messages_per_rpc/minute") + .set_measure(kRpcServerReceivedMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +// client hour +const ViewDescriptor& ClientSentBytesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/sent_bytes_per_rpc/hour") + .set_measure(kRpcClientSentBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientReceivedBytesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/received_bytes_per_rpc/hour") + .set_measure(kRpcClientReceivedBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientRoundtripLatencyHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/roundtrip_latency/hour") + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientServerLatencyHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/server_latency/hour") + .set_measure(kRpcClientServerLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientCompletedRpcsHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/completed_rpcs/hour") + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_aggregation(Aggregation::Count()) + .add_column(ClientMethodTagKey()) + .add_column(ClientStatusTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientSentMessagesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/sent_messages_per_rpc/hour") + .set_measure(kRpcClientSentMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientReceivedMessagesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/received_messages_per_rpc/hour") + .set_measure(kRpcClientReceivedMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +// server hour +const ViewDescriptor& ServerSentBytesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/server/sent_bytes_per_rpc/hour") + .set_measure(kRpcServerSentBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerReceivedBytesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/server/received_bytes_per_rpc/hour") + .set_measure(kRpcServerReceivedBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerServerLatencyHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/server/server_latency/hour") + .set_measure(kRpcServerServerLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerCompletedRpcsHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/server/completed_rpcs/hour") + .set_measure(kRpcServerServerLatencyMeasureName) + .set_aggregation(Aggregation::Count()) + .add_column(ServerMethodTagKey()) + .add_column(ServerStatusTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerSentMessagesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/server/sent_messages_per_rpc/hour") + .set_measure(kRpcServerSentMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerReceivedMessagesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/server/received_messages_per_rpc/hour") + .set_measure(kRpcServerReceivedMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +} // namespace grpc diff --git a/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs index 421b5d379e..39c24d0157 100644 --- a/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs +++ b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs @@ -24,7 +24,6 @@ namespace Grpc.Core.Interceptors { /// <summary> /// Extends the CallInvoker class to provide the interceptor facility on the client side. - /// This is an EXPERIMENTAL API. /// </summary> public static class CallInvokerExtensions { diff --git a/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs b/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs index 00b2fa8bec..c7d0c2472a 100644 --- a/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs +++ b/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs @@ -22,7 +22,6 @@ namespace Grpc.Core.Interceptors { /// <summary> /// Provides extension methods to make it easy to register interceptors on Channel objects. - /// This is an EXPERIMENTAL API. /// </summary> public static class ChannelExtensions { diff --git a/src/csharp/Grpc.Core/Interceptors/ClientInterceptorContext.cs b/src/csharp/Grpc.Core/Interceptors/ClientInterceptorContext.cs index de06a77077..4665fc553b 100644 --- a/src/csharp/Grpc.Core/Interceptors/ClientInterceptorContext.cs +++ b/src/csharp/Grpc.Core/Interceptors/ClientInterceptorContext.cs @@ -25,7 +25,6 @@ namespace Grpc.Core.Interceptors { /// <summary> /// Carries along the context associated with intercepted invocations on the client side. - /// This is an EXPERIMENTAL API. /// </summary> public struct ClientInterceptorContext<TRequest, TResponse> where TRequest : class diff --git a/src/csharp/Grpc.Core/Interceptors/Interceptor.cs b/src/csharp/Grpc.Core/Interceptors/Interceptor.cs index 56a30c34af..ab708034f3 100644 --- a/src/csharp/Grpc.Core/Interceptors/Interceptor.cs +++ b/src/csharp/Grpc.Core/Interceptors/Interceptor.cs @@ -25,7 +25,6 @@ namespace Grpc.Core.Interceptors { /// <summary> /// Serves as the base class for gRPC interceptors. - /// This is an EXPERIMENTAL API. /// </summary> public abstract class Interceptor { diff --git a/src/csharp/Grpc.Core/Interceptors/ServerServiceDefinitionExtensions.cs b/src/csharp/Grpc.Core/Interceptors/ServerServiceDefinitionExtensions.cs index b9b53247ce..8987544f7f 100644 --- a/src/csharp/Grpc.Core/Interceptors/ServerServiceDefinitionExtensions.cs +++ b/src/csharp/Grpc.Core/Interceptors/ServerServiceDefinitionExtensions.cs @@ -24,14 +24,12 @@ namespace Grpc.Core.Interceptors { /// <summary> /// Extends the ServerServiceDefinition class to add methods used to register interceptors on the server side. - /// This is an EXPERIMENTAL API. /// </summary> public static class ServerServiceDefinitionExtensions { /// <summary> /// Returns a <see cref="Grpc.Core.ServerServiceDefinition" /> instance that /// intercepts incoming calls to the underlying service handler through the given interceptor. - /// This is an EXPERIMENTAL API. /// </summary> /// <param name="serverServiceDefinition">The <see cref="Grpc.Core.ServerServiceDefinition" /> instance to register interceptors on.</param> /// <param name="interceptor">The interceptor to intercept the incoming invocations with.</param> @@ -52,7 +50,6 @@ namespace Grpc.Core.Interceptors /// <summary> /// Returns a <see cref="Grpc.Core.ServerServiceDefinition" /> instance that /// intercepts incoming calls to the underlying service handler through the given interceptors. - /// This is an EXPERIMENTAL API. /// </summary> /// <param name="serverServiceDefinition">The <see cref="Grpc.Core.ServerServiceDefinition" /> instance to register interceptors on.</param> /// <param name="interceptors"> diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index 60dacbf126..64bb407c57 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -29,7 +29,7 @@ using Grpc.Core.Utils; namespace Grpc.Core { /// <summary> - /// gRPC server. A single server can server arbitrary number of services and can listen on more than one ports. + /// gRPC server. A single server can serve an arbitrary number of services and can listen on more than one port. /// </summary> public class Server { diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 3e6ec474b7..87a2516f8d 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -935,11 +935,12 @@ grpcsharp_ssl_credentials_create(const char* pem_root_certs, if (key_cert_pair_cert_chain || key_cert_pair_private_key) { key_cert_pair.cert_chain = key_cert_pair_cert_chain; key_cert_pair.private_key = key_cert_pair_private_key; - return grpc_ssl_credentials_create(pem_root_certs, &key_cert_pair, NULL); + return grpc_ssl_credentials_create(pem_root_certs, &key_cert_pair, NULL, + NULL); } else { GPR_ASSERT(!key_cert_pair_cert_chain); GPR_ASSERT(!key_cert_pair_private_key); - return grpc_ssl_credentials_create(pem_root_certs, NULL, NULL); + return grpc_ssl_credentials_create(pem_root_certs, NULL, NULL, NULL); } } diff --git a/src/objective-c/GRPCClient/private/GRPCHost.m b/src/objective-c/GRPCClient/private/GRPCHost.m index f4b933751f..348989904a 100644 --- a/src/objective-c/GRPCClient/private/GRPCHost.m +++ b/src/objective-c/GRPCClient/private/GRPCHost.m @@ -183,14 +183,14 @@ static NSMutableDictionary *kHostCache; grpc_channel_credentials *creds; if (pemPrivateKey == nil && pemCertChain == nil) { - creds = grpc_ssl_credentials_create(rootsASCII.bytes, NULL, NULL); + creds = grpc_ssl_credentials_create(rootsASCII.bytes, NULL, NULL, NULL); } else { grpc_ssl_pem_key_cert_pair key_cert_pair; NSData *privateKeyASCII = [self nullTerminatedDataWithString:pemPrivateKey]; NSData *certChainASCII = [self nullTerminatedDataWithString:pemCertChain]; key_cert_pair.private_key = privateKeyASCII.bytes; key_cert_pair.cert_chain = certChainASCII.bytes; - creds = grpc_ssl_credentials_create(rootsASCII.bytes, &key_cert_pair, NULL); + creds = grpc_ssl_credentials_create(rootsASCII.bytes, &key_cert_pair, NULL, NULL); } @synchronized(self) { diff --git a/src/php/ext/grpc/channel_credentials.c b/src/php/ext/grpc/channel_credentials.c index 10d7380ca1..af1372878d 100644 --- a/src/php/ext/grpc/channel_credentials.c +++ b/src/php/ext/grpc/channel_credentials.c @@ -158,7 +158,7 @@ PHP_METHOD(ChannelCredentials, createSsl) { grpc_channel_credentials *creds = grpc_ssl_credentials_create( pem_root_certs, - pem_key_cert_pair.private_key == NULL ? NULL : &pem_key_cert_pair, NULL); + pem_key_cert_pair.private_key == NULL ? NULL : &pem_key_cert_pair, NULL, NULL); zval *creds_object = grpc_php_wrap_channel_credentials(creds, hashstr, false TSRMLS_CC); efree(hashkey); diff --git a/src/proto/census/census.options b/src/proto/census/census.options deleted file mode 100644 index a1f80395c7..0000000000 --- a/src/proto/census/census.options +++ /dev/null @@ -1,3 +0,0 @@ -google.census.Tag.key max_size:255 -google.census.Tag.value max_size:255 -google.census.View.tag_key max_count:15 diff --git a/src/proto/census/census.proto b/src/proto/census/census.proto deleted file mode 100644 index ae7d7763e8..0000000000 --- a/src/proto/census/census.proto +++ /dev/null @@ -1,307 +0,0 @@ -// Copyright 2016 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; - -package google.census; - -// All the census protos. -// -// Nomenclature note: capitalized names below (like Resource) are protos. -// -// Census lets you define a Resource - something which can be measured, like the -// latency of an RPC, the number of CPU cycles spent on an operation, or -// anything else you care to measure. You can record individual instances of -// measurements (a double value) for every Resource of interest. These -// individual measurements are aggregated together into an Aggregation. There -// are two Aggregation types available: Distribution (describes the -// distribution of all measurements, possibly with a histogram) and -// IntervalStats (the count and mean of measurements across specified time -// periods). An Aggregation is described by an AggregationDescriptor. -// -// You can define how your stats are broken down by Tag values and which -// Aggregations to use through a View. The corresponding combination of -// Resource/View/Aggregation which is available to census clients is called a -// Metric. - - -// The following two types are copied from -// google/protobuf/{duration,timestamp}.proto. Ideally, we would be able to -// import them, but this causes compilation issues on C-based systems -// (e.g. https://koti.kapsi.fi/jpa/nanopb/), which cannot process the C++ -// headers generated from the standard protobuf distribution. See the relevant -// proto files for full documentation of these types. - -message Duration { - // Signed seconds of the span of time. Must be from -315,576,000,000 - // to +315,576,000,000 inclusive. - int64 seconds = 1; - - // Signed fractions of a second at nanosecond resolution of the span - // of time. Durations less than one second are represented with a 0 - // `seconds` field and a positive or negative `nanos` field. For durations - // of one second or more, a non-zero value for the `nanos` field must be - // of the same sign as the `seconds` field. Must be from -999,999,999 - // to +999,999,999 inclusive. - int32 nanos = 2; -} - -message Timestamp { - // Represents seconds of UTC time since Unix epoch - // 1970-01-01T00:00:00Z. Must be from from 0001-01-01T00:00:00Z to - // 9999-12-31T23:59:59Z inclusive. - int64 seconds = 1; - - // Non-negative fractions of a second at nanosecond resolution. Negative - // second values with fractions must still have non-negative nanos values - // that count forward in time. Must be from 0 to 999,999,999 - // inclusive. - int32 nanos = 2; -} - -// Describes a Resource. -message Resource { - // name of resource, e.g. rpc_latency, cpu. Must be unique. - string name = 1; - - // More detailed description of the resource, used in documentation. - string description = 2; - - // Fundamental units of measurement supported by Census - // TODO(aveitch): expand this to include other S.I. units? - enum BasicUnit { - UNKNOWN = 0; - BITS = 1; - BYTES = 2; - SECS = 3; - CORES = 4; - MAX_UNITS = 5; - } - - // MeasurementUnit lets you build compound units of the form - // 10^n * (A * B * ...) / (X * Y * ...), - // where the elements in the numerator and denominator are all BasicUnits. A - // MeasurementUnit must have at least one BasicUnit in its numerator. - // - // To specify multiplication in the numerator or denominator, simply specify - // multiple numerator or denominator fields. For example: - // - // - byte-seconds (i.e. bytes * seconds): - // numerator: BYTES - // numerator: SECS - // - // - events/sec^2 (i.e. rate of change of events/sec): - // numerator: COUNT - // denominator: SECS - // denominator: SECS - // - // To specify multiples (in power of 10) of units, specify a non-zero prefix - // value, for example: - // - // - MB/s (i.e. megabytes / s): - // prefix: 6 - // numerator: BYTES - // denominator: SECS - // - // - nanoseconds - // prefix: -9 - // numerator: SECS - message MeasurementUnit { - int32 prefix = 1; - repeated BasicUnit numerator = 2; - repeated BasicUnit denominator = 3; - } - - // The units in which Resource values are measured. - MeasurementUnit unit = 3; -} - -// An Aggregation summarizes a series of individual Resource measurements, an -// AggregationDescriptor describes an Aggregation. -message AggregationDescriptor { - enum AggregationType { - // Unspecified. Should not be used. - UNKNOWN = 0; - // A count of measurements made. - COUNT = 1; - // A Distribution. - DISTRIBUTION = 2; - // Counts over fixed time intervals. - INTERVAL = 3; - } - // The type of Aggregation. - AggregationType type = 1; - - // At most one set of options. It is illegal to specifiy an option for - // COUNT Aggregations. interval_boundaries must be set for INTERVAL types. - // bucket_boundaries are optional for DISTRIBUTION types. - oneof options { - // Defines histogram bucket boundaries for Distributions. - BucketBoundaries bucket_boundaries = 2; - // Defines the time windows to record for IntervalStats. - IntervalBoundaries interval_boundaries = 3; - } - - // A Distribution may optionally contain a histogram of the values in the - // population. The bucket boundaries for that histogram are described by - // `bucket_boundaries`. This defines `size(bounds) + 1` (= N) buckets. The - // boundaries for bucket index i are: - // - // [-infinity, bounds[i]) for i == 0 - // [bounds[i-1], bounds[i]) for 0 < i < N-2 - // [bounds[i-1], +infinity) for i == N-1 - // - // i.e. an underflow bucket (number 0), zero or more finite buckets (1 - // through N - 2, and an overflow bucket (N - 1), with inclusive lower - // bounds and exclusive upper bounds. - // - // There must be at least one element in `bounds`. If `bounds` has only one - // element, there are no finite buckets, and that single element is the - // common boundary of the overflow and underflow buckets. - message BucketBoundaries { - // The values must be monotonically increasing. - repeated double bounds = 1; - } - - // For Interval stats, describe the size of each window. - message IntervalBoundaries { - // For each time window, specify a duration in seconds. - repeated double window_size = 1; - } -} - -// Distribution contains summary statistics for a population of values and, -// optionally, a histogram representing the distribution of those values across -// a specified set of histogram buckets, as defined in -// Aggregation.bucket_options. -// -// The summary statistics are the count, mean, minimum, and the maximum of the -// set of population of values. -// -// Although it is not forbidden, it is generally a bad idea to include -// non-finite values (infinities or NaNs) in the population of values, as this -// will render the `mean` field meaningless. -message Distribution { - // The number of values in the population. Must be non-negative. - int64 count = 1; - - // The arithmetic mean of the values in the population. If `count` is zero - // then this field must be zero. - double mean = 2; - - // Describes a range of population values. - message Range { - // The minimum of the population values. - double min = 1; - // The maximum of the population values. - double max = 2; - } - - // The range of the population values. If `count` is zero, this field will not - // be defined. - Range range = 3; - - // A Distribution may optionally contain a histogram of the values in the - // population. The histogram is given in `bucket_count` as counts of values - // that fall into one of a sequence of non-overlapping buckets, as described - // by `AggregationDescriptor.options.bucket_boundaries`. - // The sum of the values in `bucket_counts` must equal the value in `count`. - // - // Bucket counts are given in order under the numbering scheme described - // above (the underflow bucket has number 0; the finite buckets, if any, - // have numbers 1 through N-2; the overflow bucket has number N-1). - // - // The size of `bucket_count` must be no greater than N as defined in - // `bucket_boundaries`. - // - // Any suffix of trailing zero bucket_count fields may be omitted. - repeated int64 bucket_count = 4; -} - -// Record summary stats over various time windows. -message IntervalStats { - // Summary statistic over a single time window. - message Window { - // The window duration. Must be positive. - Duration window_size = 1; - // The number of measurements in this window. - int64 count = 2; - // The arithmetic mean of all measurements in the window. - double mean = 3; - } - - // Full set of windows for this aggregation. - repeated Window window = 1; -} - -// A Tag: key-value pair. -message Tag { - string key = 1; - string value = 2; -} - -// A View specifies an Aggregation and a set of tag keys. The Aggregation will -// be broken down by the unique set of matching tag values for each measurement. -message View { - // Name of view. Must be unique. - string name = 1; - - // More detailed description, for documentation purposes. - string description = 2; - - // Name of Resource to be broken down for this view. - string resource_name = 3; - - // Aggregation type to associate with this View. - AggregationDescriptor aggregation = 4; - - // Tag keys to match with a given Resource measurement. If no keys are - // specified, then all stats are recorded. Keys must be unique. - repeated string tag_key = 5; -} - -// An Aggregation summarizes a series of individual Resource measurements. -message Aggregation { - // Name of this aggregation. - string name = 1; - - // More detailed description, for documentation purposes. - string description = 2; - - // The data for this Aggregation. - oneof data { - uint64 count = 3; - Distribution distribution = 4; - IntervalStats interval_stats = 5; - } - - // Tags associated with this Aggregation. - repeated Tag tag = 6; -} - -// A Metric represents all the Aggregations for a particular view. -message Metric { - // View associated with this Metric. - string view_name = 1; - - // Aggregations - each will have a unique set of tag values for the tag_keys - // associated with the corresponding View. - repeated Aggregation aggregation = 2; - - // Start and end timestamps over which the metric was accumulated. These - // values are not relevant/defined for IntervalStats aggregations, which are - // always accumulated over a fixed time period. - Timestamp start = 3; - Timestamp end = 4; -} diff --git a/src/proto/census/trace_context.options b/src/proto/census/trace_context.options deleted file mode 100644 index e69de29bb2..0000000000 --- a/src/proto/census/trace_context.options +++ /dev/null diff --git a/src/proto/census/trace_context.proto b/src/proto/census/trace_context.proto deleted file mode 100644 index 7e5087dbee..0000000000 --- a/src/proto/census/trace_context.proto +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2016 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; - -package google.trace; - -// Tracing information that is propagated with RPC's. -message TraceContext { - // A TraceId uniquely represents a single Trace. It is a 128-bit nonce. - // The 128-bit ID is split into 2 64-bit chunks. (REQUIRED) - fixed64 trace_id_hi = 1; - fixed64 trace_id_lo = 2; - // ID of parent (client) span. (REQUIRED) - fixed64 span_id = 3; - // Span option flags. First bit is true if this trace is sampled. (OPTIONAL) - fixed32 span_options = 4; -} diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi index f4ccfbc016..d2c0389ca6 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi @@ -142,12 +142,12 @@ cdef class SSLChannelCredentials(ChannelCredentials): c_pem_root_certificates = self._pem_root_certificates if self._private_key is None and self._certificate_chain is None: return grpc_ssl_credentials_create( - c_pem_root_certificates, NULL, NULL) + c_pem_root_certificates, NULL, NULL, NULL) else: c_pem_key_certificate_pair.private_key = self._private_key c_pem_key_certificate_pair.certificate_chain = self._certificate_chain return grpc_ssl_credentials_create( - c_pem_root_certificates, &c_pem_key_certificate_pair, NULL) + c_pem_root_certificates, &c_pem_key_certificate_pair, NULL, NULL) cdef class CompositeChannelCredentials(ChannelCredentials): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index cfefeaf938..bcbfec0c9f 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -453,11 +453,14 @@ cdef extern from "grpc/grpc_security.h": # We don't care about the internals (and in fact don't know them) pass - ctypedef struct grpc_ssl_session_cache: # We don't care about the internals (and in fact don't know them) pass + ctypedef struct verify_peer_options: + # We don't care about the internals (and in fact don't know them) + pass + ctypedef void (*grpc_ssl_roots_override_callback)(char **pem_root_certs) grpc_ssl_session_cache *grpc_ssl_session_cache_create_lru(size_t capacity) @@ -469,7 +472,7 @@ cdef extern from "grpc/grpc_security.h": grpc_channel_credentials *grpc_google_default_credentials_create() nogil grpc_channel_credentials *grpc_ssl_credentials_create( const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pair, - void *reserved) nogil + verify_peer_options *verify_options, void *reserved) nogil grpc_channel_credentials *grpc_composite_channel_credentials_create( grpc_channel_credentials *creds1, grpc_call_credentials *creds2, void *reserved) nogil diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index a64fb42f9b..b20b8155a0 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -346,6 +346,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc', + 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc', @@ -353,7 +354,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc', 'src/core/ext/filters/load_reporting/server_load_reporting_filter.cc', 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc', - 'src/core/ext/census/grpc_context.cc', + 'src/cpp/ext/filters/census/grpc_context.cc', 'src/core/ext/filters/max_age/max_age_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', 'src/core/ext/filters/http/client_authority_filter.cc', diff --git a/src/ruby/end2end/package_with_underscore_checker.rb b/src/ruby/end2end/package_with_underscore_checker.rb new file mode 100644 index 0000000000..27ea00ffa7 --- /dev/null +++ b/src/ruby/end2end/package_with_underscore_checker.rb @@ -0,0 +1,54 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'open3' +require 'tmpdir' + +def main + root_dir = File.join(File.dirname(__FILE__), '..', '..', '..') + pb_dir = File.join(root_dir, 'src', 'ruby', 'end2end', 'protos') + + fail 'CONFIG env variable unexpectedly unset' unless ENV['CONFIG'] + bins_sub_dir = ENV['CONFIG'] + bins_dir = File.join(root_dir, 'bins', bins_sub_dir) + + plugin = File.join(bins_dir, 'grpc_ruby_plugin') + protoc = File.join(bins_dir, 'protobuf', 'protoc') + + got = nil + + Dir.mktmpdir do |tmp_dir| + gen_out = File.join(tmp_dir, 'package_with_underscore', 'service_services_pb.rb') + + pid = spawn( + protoc, + "--proto_path=#{pb_dir}", + 'package_with_underscore/service.proto', + "--grpc_out=#{tmp_dir}", + "--plugin=protoc-gen-grpc=#{plugin}" + ) + Process.waitpid2(pid) + File.open(gen_out) { |f| got = f.read } + end + + correct_modularized_rpc = 'rpc :TestOne, ' \ + 'Grpc::Testing::PackageWithUnderscore::Data::Request, ' \ + 'Grpc::Testing::PackageWithUnderscore::Data::Response' + + return if got.include?(correct_modularized_rpc) + + fail 'generated file does not match with correct_modularized_rpc' +end + +main diff --git a/src/ruby/spec/pb/package_with_underscore/data.proto b/src/ruby/end2end/protos/package_with_underscore/data.proto index 2706f1d7be..2706f1d7be 100644 --- a/src/ruby/spec/pb/package_with_underscore/data.proto +++ b/src/ruby/end2end/protos/package_with_underscore/data.proto diff --git a/src/ruby/spec/pb/package_with_underscore/service.proto b/src/ruby/end2end/protos/package_with_underscore/service.proto index 814c7898cd..814c7898cd 100644 --- a/src/ruby/spec/pb/package_with_underscore/service.proto +++ b/src/ruby/end2end/protos/package_with_underscore/service.proto diff --git a/src/ruby/ext/grpc/rb_channel_credentials.c b/src/ruby/ext/grpc/rb_channel_credentials.c index b23a32caf1..178224c6e0 100644 --- a/src/ruby/ext/grpc/rb_channel_credentials.c +++ b/src/ruby/ext/grpc/rb_channel_credentials.c @@ -159,12 +159,12 @@ static VALUE grpc_rb_channel_credentials_init(int argc, VALUE* argv, pem_root_certs_cstr = RSTRING_PTR(pem_root_certs); } if (pem_private_key == Qnil && pem_cert_chain == Qnil) { - creds = grpc_ssl_credentials_create(pem_root_certs_cstr, NULL, NULL); + creds = grpc_ssl_credentials_create(pem_root_certs_cstr, NULL, NULL, NULL); } else { key_cert_pair.private_key = RSTRING_PTR(pem_private_key); key_cert_pair.cert_chain = RSTRING_PTR(pem_cert_chain); - creds = - grpc_ssl_credentials_create(pem_root_certs_cstr, &key_cert_pair, NULL); + creds = grpc_ssl_credentials_create(pem_root_certs_cstr, &key_cert_pair, + NULL, NULL); } if (creds == NULL) { rb_raise(rb_eRuntimeError, "could not create a credentials, not sure why"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 3cc6492d04..474405ae3f 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -311,7 +311,7 @@ extern grpc_google_default_credentials_create_type grpc_google_default_credentia typedef void(*grpc_set_ssl_roots_override_callback_type)(grpc_ssl_roots_override_callback cb); extern grpc_set_ssl_roots_override_callback_type grpc_set_ssl_roots_override_callback_import; #define grpc_set_ssl_roots_override_callback grpc_set_ssl_roots_override_callback_import -typedef grpc_channel_credentials*(*grpc_ssl_credentials_create_type)(const char* pem_root_certs, grpc_ssl_pem_key_cert_pair* pem_key_cert_pair, void* reserved); +typedef grpc_channel_credentials*(*grpc_ssl_credentials_create_type)(const char* pem_root_certs, grpc_ssl_pem_key_cert_pair* pem_key_cert_pair, const verify_peer_options* verify_options, void* reserved); extern grpc_ssl_credentials_create_type grpc_ssl_credentials_create_import; #define grpc_ssl_credentials_create grpc_ssl_credentials_create_import typedef void(*grpc_call_credentials_release_type)(grpc_call_credentials* creds); diff --git a/src/ruby/spec/call_credentials_spec.rb b/src/ruby/spec/call_credentials_spec.rb index 86f30b46f1..d949ba2235 100644 --- a/src/ruby/spec/call_credentials_spec.rb +++ b/src/ruby/spec/call_credentials_spec.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'grpc' +require 'spec_helper' describe GRPC::Core::CallCredentials do CallCredentials = GRPC::Core::CallCredentials diff --git a/src/ruby/spec/call_spec.rb b/src/ruby/spec/call_spec.rb index 1cc0500242..22d345f658 100644 --- a/src/ruby/spec/call_spec.rb +++ b/src/ruby/spec/call_spec.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'grpc' +require 'spec_helper' include GRPC::Core::StatusCodes diff --git a/src/ruby/spec/channel_credentials_spec.rb b/src/ruby/spec/channel_credentials_spec.rb index e53f316208..b05e5aebf8 100644 --- a/src/ruby/spec/channel_credentials_spec.rb +++ b/src/ruby/spec/channel_credentials_spec.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'grpc' +require 'spec_helper' describe GRPC::Core::ChannelCredentials do ChannelCredentials = GRPC::Core::ChannelCredentials diff --git a/src/ruby/spec/channel_spec.rb b/src/ruby/spec/channel_spec.rb index 5b2e355963..3c9eca47ec 100644 --- a/src/ruby/spec/channel_spec.rb +++ b/src/ruby/spec/channel_spec.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'grpc' +require 'spec_helper' def load_test_certs test_root = File.join(File.dirname(__FILE__), 'testdata') diff --git a/src/ruby/spec/client_auth_spec.rb b/src/ruby/spec/client_auth_spec.rb index b955ad231e..c12ed2e0f4 100644 --- a/src/ruby/spec/client_auth_spec.rb +++ b/src/ruby/spec/client_auth_spec.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'grpc' +require 'spec_helper' def create_channel_creds test_root = File.join(File.dirname(__FILE__), 'testdata') diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb index afbfb0bc43..afde5073a6 100644 --- a/src/ruby/spec/client_server_spec.rb +++ b/src/ruby/spec/client_server_spec.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'grpc' +require 'spec_helper' include GRPC::Core diff --git a/src/ruby/spec/compression_options_spec.rb b/src/ruby/spec/compression_options_spec.rb index 03c3cd9f07..05318e6686 100644 --- a/src/ruby/spec/compression_options_spec.rb +++ b/src/ruby/spec/compression_options_spec.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'grpc' +require 'spec_helper' describe GRPC::Core::CompressionOptions do # Note these constants should be updated diff --git a/src/ruby/spec/error_sanity_spec.rb b/src/ruby/spec/error_sanity_spec.rb index b8f0638695..c36635ea26 100644 --- a/src/ruby/spec/error_sanity_spec.rb +++ b/src/ruby/spec/error_sanity_spec.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'grpc' +require 'spec_helper' StatusCodes = GRPC::Core::StatusCodes diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index da50f8d0c9..3f878cc100 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'grpc' +require 'spec_helper' Thread.abort_on_exception = true diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb index 6852b9804f..b4ffe5eb92 100644 --- a/src/ruby/spec/generic/rpc_desc_spec.rb +++ b/src/ruby/spec/generic/rpc_desc_spec.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'grpc' +require 'spec_helper' require 'grpc/generic/rpc_desc' describe GRPC::RpcDesc do diff --git a/src/ruby/spec/generic/rpc_server_pool_spec.rb b/src/ruby/spec/generic/rpc_server_pool_spec.rb index 27a42c82c1..0f2dac272e 100644 --- a/src/ruby/spec/generic/rpc_server_pool_spec.rb +++ b/src/ruby/spec/generic/rpc_server_pool_spec.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'grpc' +require 'spec_helper' Thread.abort_on_exception = true diff --git a/src/ruby/spec/generic/service_spec.rb b/src/ruby/spec/generic/service_spec.rb index ad1c268d32..57aec89ce5 100644 --- a/src/ruby/spec/generic/service_spec.rb +++ b/src/ruby/spec/generic/service_spec.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'grpc' +require 'spec_helper' require 'grpc/generic/rpc_desc' require 'grpc/generic/service' diff --git a/src/ruby/spec/google_rpc_status_utils_spec.rb b/src/ruby/spec/google_rpc_status_utils_spec.rb index 3263589b6a..1f67d3a916 100644 --- a/src/ruby/spec/google_rpc_status_utils_spec.rb +++ b/src/ruby/spec/google_rpc_status_utils_spec.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'grpc' +require 'spec_helper' require_relative '../lib/grpc/google_rpc_status_utils' require_relative '../pb/src/proto/grpc/testing/messages_pb' require_relative '../pb/src/proto/grpc/testing/messages_pb' diff --git a/src/ruby/spec/pb/duplicate/codegen_spec.rb b/src/ruby/spec/pb/duplicate/codegen_spec.rb index 9f82858d15..fed7c1c8be 100644 --- a/src/ruby/spec/pb/duplicate/codegen_spec.rb +++ b/src/ruby/spec/pb/duplicate/codegen_spec.rb @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +require 'spec_helper' require 'open3' require 'tmpdir' diff --git a/src/ruby/spec/pb/health/checker_spec.rb b/src/ruby/spec/pb/health/checker_spec.rb index 58a602327c..84e63c2e54 100644 --- a/src/ruby/spec/pb/health/checker_spec.rb +++ b/src/ruby/spec/pb/health/checker_spec.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'grpc' +require 'spec_helper' require 'grpc/health/v1/health_pb' require 'grpc/health/checker' require 'open3' diff --git a/src/ruby/spec/pb/package_with_underscore/checker_spec.rb b/src/ruby/spec/pb/package_with_underscore/checker_spec.rb deleted file mode 100644 index dac7c14a9a..0000000000 --- a/src/ruby/spec/pb/package_with_underscore/checker_spec.rb +++ /dev/null @@ -1,51 +0,0 @@ -# Copyright 2016 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -require 'open3' -require 'tmpdir' - -describe 'Package with underscore protobuf code generation' do - it 'should have the same content as created by code generation' do - root_dir = File.join(File.dirname(__FILE__), '..', '..', '..', '..', '..') - pb_dir = File.join(root_dir, 'src', 'ruby', 'spec', 'pb') - - fail 'CONFIG env variable unexpectedly unset' unless ENV['CONFIG'] - bins_sub_dir = ENV['CONFIG'] - bins_dir = File.join(root_dir, 'bins', bins_sub_dir) - - plugin = File.join(bins_dir, 'grpc_ruby_plugin') - protoc = File.join(bins_dir, 'protobuf', 'protoc') - - got = nil - - Dir.mktmpdir do |tmp_dir| - gen_out = File.join(tmp_dir, 'package_with_underscore', 'service_services_pb.rb') - - pid = spawn( - protoc, - '-I.', - 'package_with_underscore/service.proto', - "--grpc_out=#{tmp_dir}", - "--plugin=protoc-gen-grpc=#{plugin}", - chdir: pb_dir) - Process.waitpid2(pid) - File.open(gen_out) { |f| got = f.read } - end - - correct_modularized_rpc = 'rpc :TestOne, ' \ - 'Grpc::Testing::PackageWithUnderscore::Data::Request, ' \ - 'Grpc::Testing::PackageWithUnderscore::Data::Response' - expect(got).to include(correct_modularized_rpc) - end -end diff --git a/src/ruby/spec/server_credentials_spec.rb b/src/ruby/spec/server_credentials_spec.rb index 673d832f62..f773a5f701 100644 --- a/src/ruby/spec/server_credentials_spec.rb +++ b/src/ruby/spec/server_credentials_spec.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'grpc' +require 'spec_helper' def load_test_certs test_root = File.join(File.dirname(__FILE__), 'testdata') diff --git a/src/ruby/spec/server_spec.rb b/src/ruby/spec/server_spec.rb index 6eaac5ded1..76038d8b8b 100644 --- a/src/ruby/spec/server_spec.rb +++ b/src/ruby/spec/server_spec.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'grpc' +require 'spec_helper' def load_test_certs test_root = File.join(File.dirname(__FILE__), 'testdata') diff --git a/src/ruby/spec/spec_helper.rb b/src/ruby/spec/spec_helper.rb index 8fe9e6e808..8fe2c16b35 100644 --- a/src/ruby/spec/spec_helper.rb +++ b/src/ruby/spec/spec_helper.rb @@ -31,6 +31,7 @@ end if ENV['COVERAGE_NAME'] require 'rspec' require 'logging' require 'rspec/logging_helper' +require 'grpc' require_relative 'support/services' require_relative 'support/helpers' diff --git a/src/ruby/spec/support/services.rb b/src/ruby/spec/support/services.rb index 27cc8e61ac..27239cd66c 100644 --- a/src/ruby/spec/support/services.rb +++ b/src/ruby/spec/support/services.rb @@ -13,7 +13,7 @@ # limitations under the License. # Test stubs for various scenarios -require 'grpc' +require 'spec_helper' # A test message class EchoMsg diff --git a/src/ruby/spec/time_consts_spec.rb b/src/ruby/spec/time_consts_spec.rb index f7afe6b70a..41655ab106 100644 --- a/src/ruby/spec/time_consts_spec.rb +++ b/src/ruby/spec/time_consts_spec.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'grpc' +require 'spec_helper' TimeConsts = GRPC::Core::TimeConsts |