diff options
author | 2018-06-21 10:01:06 -0700 | |
---|---|---|
committer | 2018-06-21 10:01:06 -0700 | |
commit | 1517f8090815aec45b530a931a18684c1f5d8ee8 (patch) | |
tree | 7fab1f683b93ea48ea75a8a86aea7e3e5080ba53 /src | |
parent | fb082835793cacfd7b64eaebc68836baccf82895 (diff) | |
parent | 222655cab54028db55d8a24e08de26fbe84dcf8b (diff) |
Merge branch 'master' into epollex-ownerfd-fix
Diffstat (limited to 'src')
24 files changed, 644 insertions, 205 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 34ea97e23e..520431e63b 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -794,6 +794,15 @@ typedef struct { // The batch to use in the subchannel call. // Its payload field points to subchannel_call_retry_state.batch_payload. grpc_transport_stream_op_batch batch; + // For intercepting on_complete. + grpc_closure on_complete; +} subchannel_batch_data; + +// Retry state associated with a subchannel call. +// Stored in the parent_data of the subchannel call object. +typedef struct { + // subchannel_batch_data.batch.payload points to this. + grpc_transport_stream_op_batch_payload batch_payload; // For send_initial_metadata. // Note that we need to make a copy of the initial metadata for each // subchannel call instead of just referring to the copy in call_data, @@ -818,15 +827,6 @@ typedef struct { grpc_metadata_batch recv_trailing_metadata; grpc_transport_stream_stats collect_stats; grpc_closure recv_trailing_metadata_ready; - // For intercepting on_complete. - grpc_closure on_complete; -} subchannel_batch_data; - -// Retry state associated with a subchannel call. -// Stored in the parent_data of the subchannel call object. -typedef struct { - // subchannel_batch_data.batch.payload points to this. - grpc_transport_stream_op_batch_payload batch_payload; // These fields indicate which ops have been started and completed on // this subchannel call. size_t started_send_message_count; @@ -1524,17 +1524,21 @@ static subchannel_batch_data* batch_data_create(grpc_call_element* elem, static void batch_data_unref(subchannel_batch_data* batch_data) { if (gpr_unref(&batch_data->refs)) { - if (batch_data->send_initial_metadata_storage != nullptr) { - grpc_metadata_batch_destroy(&batch_data->send_initial_metadata); + subchannel_call_retry_state* retry_state = + static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); + if (batch_data->batch.send_initial_metadata) { + grpc_metadata_batch_destroy(&retry_state->send_initial_metadata); } - if (batch_data->send_trailing_metadata_storage != nullptr) { - grpc_metadata_batch_destroy(&batch_data->send_trailing_metadata); + if (batch_data->batch.send_trailing_metadata) { + grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata); } if (batch_data->batch.recv_initial_metadata) { - grpc_metadata_batch_destroy(&batch_data->recv_initial_metadata); + grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata); } if (batch_data->batch.recv_trailing_metadata) { - grpc_metadata_batch_destroy(&batch_data->recv_trailing_metadata); + grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata); } GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref"); call_data* calld = static_cast<call_data*>(batch_data->elem->call_data); @@ -1560,8 +1564,12 @@ static void invoke_recv_initial_metadata_callback(void* arg, }); GPR_ASSERT(pending != nullptr); // Return metadata. + subchannel_call_retry_state* retry_state = + static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); grpc_metadata_batch_move( - &batch_data->recv_initial_metadata, + &retry_state->recv_initial_metadata, pending->batch->payload->recv_initial_metadata.recv_initial_metadata); // Update bookkeeping. // Note: Need to do this before invoking the callback, since invoking @@ -1606,7 +1614,7 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { // the recv_trailing_metadata_ready callback, then defer propagating this // callback back to the surface. We can evaluate whether to retry when // recv_trailing_metadata comes back. - if (GPR_UNLIKELY((batch_data->trailing_metadata_available || + if (GPR_UNLIKELY((retry_state->trailing_metadata_available || error != GRPC_ERROR_NONE) && !retry_state->completed_recv_trailing_metadata)) { if (grpc_client_channel_trace.enabled()) { @@ -1651,8 +1659,12 @@ static void invoke_recv_message_callback(void* arg, grpc_error* error) { }); GPR_ASSERT(pending != nullptr); // Return payload. + subchannel_call_retry_state* retry_state = + static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); *pending->batch->payload->recv_message.recv_message = - std::move(batch_data->recv_message); + std::move(retry_state->recv_message); // Update bookkeeping. // Note: Need to do this before invoking the callback, since invoking // the callback will result in yielding the call combiner. @@ -1693,7 +1705,7 @@ static void recv_message_ready(void* arg, grpc_error* error) { // callback back to the surface. We can evaluate whether to retry when // recv_trailing_metadata comes back. if (GPR_UNLIKELY( - (batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) && + (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) && !retry_state->completed_recv_trailing_metadata)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, @@ -1766,8 +1778,12 @@ static void add_closure_for_recv_trailing_metadata_ready( return; } // Return metadata. + subchannel_call_retry_state* retry_state = + static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); grpc_metadata_batch_move( - &batch_data->recv_trailing_metadata, + &retry_state->recv_trailing_metadata, pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata); // Add closure. closures->Add(pending->batch->payload->recv_trailing_metadata @@ -1788,11 +1804,11 @@ static void add_closures_for_deferred_recv_callbacks( // Add closure for deferred recv_initial_metadata_ready. if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch != nullptr)) { - GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready, + GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready, invoke_recv_initial_metadata_callback, retry_state->recv_initial_metadata_ready_deferred_batch, grpc_schedule_on_exec_ctx); - closures->Add(&batch_data->recv_initial_metadata_ready, + closures->Add(&retry_state->recv_initial_metadata_ready, retry_state->recv_initial_metadata_error, "resuming recv_initial_metadata_ready"); retry_state->recv_initial_metadata_ready_deferred_batch = nullptr; @@ -1800,11 +1816,11 @@ static void add_closures_for_deferred_recv_callbacks( // Add closure for deferred recv_message_ready. if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch != nullptr)) { - GRPC_CLOSURE_INIT(&batch_data->recv_message_ready, + GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, invoke_recv_message_callback, retry_state->recv_message_ready_deferred_batch, grpc_schedule_on_exec_ctx); - closures->Add(&batch_data->recv_message_ready, + closures->Add(&retry_state->recv_message_ready, retry_state->recv_message_error, "resuming recv_message_ready"); retry_state->recv_message_ready_deferred_batch = nullptr; @@ -2120,28 +2136,28 @@ static void add_retriable_send_initial_metadata_op( // // If we've already completed one or more attempts, add the // grpc-retry-attempts header. - batch_data->send_initial_metadata_storage = + retry_state->send_initial_metadata_storage = static_cast<grpc_linked_mdelem*>(gpr_arena_alloc( calld->arena, sizeof(grpc_linked_mdelem) * (calld->send_initial_metadata.list.count + (calld->num_attempts_completed > 0)))); grpc_metadata_batch_copy(&calld->send_initial_metadata, - &batch_data->send_initial_metadata, - batch_data->send_initial_metadata_storage); - if (GPR_UNLIKELY(batch_data->send_initial_metadata.idx.named + &retry_state->send_initial_metadata, + retry_state->send_initial_metadata_storage); + if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named .grpc_previous_rpc_attempts != nullptr)) { - grpc_metadata_batch_remove( - &batch_data->send_initial_metadata, - batch_data->send_initial_metadata.idx.named.grpc_previous_rpc_attempts); + grpc_metadata_batch_remove(&retry_state->send_initial_metadata, + retry_state->send_initial_metadata.idx.named + .grpc_previous_rpc_attempts); } if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) { grpc_mdelem retry_md = grpc_mdelem_from_slices( GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS, *retry_count_strings[calld->num_attempts_completed - 1]); grpc_error* error = grpc_metadata_batch_add_tail( - &batch_data->send_initial_metadata, - &batch_data->send_initial_metadata_storage[calld->send_initial_metadata - .list.count], + &retry_state->send_initial_metadata, + &retry_state->send_initial_metadata_storage[calld->send_initial_metadata + .list.count], retry_md); if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { gpr_log(GPR_ERROR, "error adding retry metadata: %s", @@ -2152,7 +2168,7 @@ static void add_retriable_send_initial_metadata_op( retry_state->started_send_initial_metadata = true; batch_data->batch.send_initial_metadata = true; batch_data->batch.payload->send_initial_metadata.send_initial_metadata = - &batch_data->send_initial_metadata; + &retry_state->send_initial_metadata; batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags = calld->send_initial_metadata_flags; batch_data->batch.payload->send_initial_metadata.peer_string = @@ -2173,10 +2189,10 @@ static void add_retriable_send_message_op( grpc_core::ByteStreamCache* cache = (*calld->send_messages)[retry_state->started_send_message_count]; ++retry_state->started_send_message_count; - batch_data->send_message.Init(cache); + retry_state->send_message.Init(cache); batch_data->batch.send_message = true; batch_data->batch.payload->send_message.send_message.reset( - batch_data->send_message.get()); + retry_state->send_message.get()); } // Adds retriable send_trailing_metadata op to batch_data. @@ -2186,17 +2202,17 @@ static void add_retriable_send_trailing_metadata_op( // We need to make a copy of the metadata batch for each attempt, since // the filters in the subchannel stack may modify this batch, and we don't // want those modifications to be passed forward to subsequent attempts. - batch_data->send_trailing_metadata_storage = + retry_state->send_trailing_metadata_storage = static_cast<grpc_linked_mdelem*>(gpr_arena_alloc( calld->arena, sizeof(grpc_linked_mdelem) * calld->send_trailing_metadata.list.count)); grpc_metadata_batch_copy(&calld->send_trailing_metadata, - &batch_data->send_trailing_metadata, - batch_data->send_trailing_metadata_storage); + &retry_state->send_trailing_metadata, + retry_state->send_trailing_metadata_storage); retry_state->started_send_trailing_metadata = true; batch_data->batch.send_trailing_metadata = true; batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata = - &batch_data->send_trailing_metadata; + &retry_state->send_trailing_metadata; } // Adds retriable recv_initial_metadata op to batch_data. @@ -2205,16 +2221,16 @@ static void add_retriable_recv_initial_metadata_op( subchannel_batch_data* batch_data) { retry_state->started_recv_initial_metadata = true; batch_data->batch.recv_initial_metadata = true; - grpc_metadata_batch_init(&batch_data->recv_initial_metadata); + grpc_metadata_batch_init(&retry_state->recv_initial_metadata); batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata = - &batch_data->recv_initial_metadata; + &retry_state->recv_initial_metadata; batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available = - &batch_data->trailing_metadata_available; - GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready, + &retry_state->trailing_metadata_available; + GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready, recv_initial_metadata_ready, batch_data, grpc_schedule_on_exec_ctx); batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready = - &batch_data->recv_initial_metadata_ready; + &retry_state->recv_initial_metadata_ready; } // Adds retriable recv_message op to batch_data. @@ -2224,11 +2240,11 @@ static void add_retriable_recv_message_op( ++retry_state->started_recv_message_count; batch_data->batch.recv_message = true; batch_data->batch.payload->recv_message.recv_message = - &batch_data->recv_message; - GRPC_CLOSURE_INIT(&batch_data->recv_message_ready, recv_message_ready, + &retry_state->recv_message; + GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready, batch_data, grpc_schedule_on_exec_ctx); batch_data->batch.payload->recv_message.recv_message_ready = - &batch_data->recv_message_ready; + &retry_state->recv_message_ready; } // Adds retriable recv_trailing_metadata op to batch_data. @@ -2237,16 +2253,17 @@ static void add_retriable_recv_trailing_metadata_op( subchannel_batch_data* batch_data) { retry_state->started_recv_trailing_metadata = true; batch_data->batch.recv_trailing_metadata = true; - grpc_metadata_batch_init(&batch_data->recv_trailing_metadata); + grpc_metadata_batch_init(&retry_state->recv_trailing_metadata); batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata = - &batch_data->recv_trailing_metadata; + &retry_state->recv_trailing_metadata; batch_data->batch.payload->recv_trailing_metadata.collect_stats = - &batch_data->collect_stats; - GRPC_CLOSURE_INIT(&batch_data->recv_trailing_metadata_ready, + &retry_state->collect_stats; + GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready, recv_trailing_metadata_ready, batch_data, grpc_schedule_on_exec_ctx); batch_data->batch.payload->recv_trailing_metadata - .recv_trailing_metadata_ready = &batch_data->recv_trailing_metadata_ready; + .recv_trailing_metadata_ready = + &retry_state->recv_trailing_metadata_ready; } // Helper function used to start a recv_trailing_metadata batch. This @@ -2400,11 +2417,9 @@ static void add_subchannel_batches_for_pending_batches( // started subchannel batch, since we'll propagate the // completion when it completes. if (retry_state->completed_recv_trailing_metadata) { - subchannel_batch_data* batch_data = - retry_state->recv_trailing_metadata_internal_batch; // Batches containing recv_trailing_metadata always succeed. closures->Add( - &batch_data->recv_trailing_metadata_ready, GRPC_ERROR_NONE, + &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE, "re-executing recv_trailing_metadata_ready to propagate " "internally triggered result"); } else { diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 34d5e6e218..8b73b01dea 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -139,22 +139,27 @@ static bool update_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s, static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s, const char* staller) { - gpr_log( - GPR_DEBUG, - "%s:%p stream %d stalled by %s [fc:pending=%" PRIdPTR - ":pending-compressed=%" PRIdPTR ":flowed=%" PRId64 - ":peer_initwin=%d:t_win=%" PRId64 ":s_win=%d:s_delta=%" PRId64 "]", - t->peer_string, t, s->id, staller, s->flow_controlled_buffer.length, - s->compressed_data_buffer.length, s->flow_controlled_bytes_flowed, - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], - t->flow_control->remote_window(), - static_cast<uint32_t> GPR_MAX( - 0, - s->flow_control->remote_window_delta() + - (int64_t)t->settings[GRPC_PEER_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]), - s->flow_control->remote_window_delta()); + if (grpc_flowctl_trace.enabled()) { + gpr_log( + GPR_DEBUG, + "%s:%p stream %d moved to stalled list by %s. This is FULLY expected " + "to happen in a healthy program that is not seeing flow control stalls." + " However, if you know that there are unwanted stalls, here is some " + "helpful data: [fc:pending=%" PRIdPTR ":pending-compressed=%" PRIdPTR + ":flowed=%" PRId64 ":peer_initwin=%d:t_win=%" PRId64 + ":s_win=%d:s_delta=%" PRId64 "]", + t->peer_string, t, s->id, staller, s->flow_controlled_buffer.length, + s->compressed_data_buffer.length, s->flow_controlled_bytes_flowed, + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + t->flow_control->remote_window(), + static_cast<uint32_t> GPR_MAX( + 0, + s->flow_control->remote_window_delta() + + (int64_t)t->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]), + s->flow_control->remote_window_delta()); + } } static bool stream_ref_if_not_destroyed(gpr_refcount* r) { diff --git a/src/core/lib/channel/channel_trace.cc b/src/core/lib/channel/channel_trace.cc index eb7214b355..0f655d8716 100644 --- a/src/core/lib/channel/channel_trace.cc +++ b/src/core/lib/channel/channel_trace.cc @@ -28,7 +28,6 @@ #include <stdlib.h> #include <string.h> -#include "src/core/lib/channel/channelz_registry.h" #include "src/core/lib/channel/status_util.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" @@ -40,16 +39,17 @@ #include "src/core/lib/transport/error_utils.h" namespace grpc_core { +namespace channelz { ChannelTrace::TraceEvent::TraceEvent( Severity severity, grpc_slice data, - RefCountedPtr<ChannelTrace> referenced_tracer, ReferencedType type) + RefCountedPtr<ChannelNode> referenced_channel, ReferencedType type) : severity_(severity), data_(data), timestamp_(grpc_millis_to_timespec(grpc_core::ExecCtx::Get()->Now(), GPR_CLOCK_REALTIME)), next_(nullptr), - referenced_tracer_(std::move(referenced_tracer)), + referenced_channel_(std::move(referenced_channel)), referenced_type_(type) {} ChannelTrace::TraceEvent::TraceEvent(Severity severity, grpc_slice data) @@ -62,15 +62,13 @@ ChannelTrace::TraceEvent::TraceEvent(Severity severity, grpc_slice data) ChannelTrace::TraceEvent::~TraceEvent() { grpc_slice_unref_internal(data_); } ChannelTrace::ChannelTrace(size_t max_events) - : channel_uuid_(-1), - num_events_logged_(0), + : num_events_logged_(0), list_size_(0), max_list_size_(max_events), head_trace_(nullptr), tail_trace_(nullptr) { if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0 gpr_mu_init(&tracer_mu_); - channel_uuid_ = ChannelzRegistry::Register(this); time_created_ = grpc_millis_to_timespec(grpc_core::ExecCtx::Get()->Now(), GPR_CLOCK_REALTIME); } @@ -83,12 +81,9 @@ ChannelTrace::~ChannelTrace() { it = it->next(); Delete<TraceEvent>(to_free); } - ChannelzRegistry::Unregister(channel_uuid_); gpr_mu_destroy(&tracer_mu_); } -intptr_t ChannelTrace::GetUuid() const { return channel_uuid_; } - void ChannelTrace::AddTraceEventHelper(TraceEvent* new_trace_event) { ++num_events_logged_; // first event case @@ -117,20 +112,21 @@ void ChannelTrace::AddTraceEvent(Severity severity, grpc_slice data) { void ChannelTrace::AddTraceEventReferencingChannel( Severity severity, grpc_slice data, - RefCountedPtr<ChannelTrace> referenced_tracer) { + RefCountedPtr<ChannelNode> referenced_channel) { if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0 // create and fill up the new event - AddTraceEventHelper( - New<TraceEvent>(severity, data, std::move(referenced_tracer), Channel)); + AddTraceEventHelper(New<TraceEvent>( + severity, data, std::move(referenced_channel), ReferencedType::Channel)); } void ChannelTrace::AddTraceEventReferencingSubchannel( Severity severity, grpc_slice data, - RefCountedPtr<ChannelTrace> referenced_tracer) { + RefCountedPtr<ChannelNode> referenced_subchannel) { if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0 // create and fill up the new event - AddTraceEventHelper(New<TraceEvent>( - severity, data, std::move(referenced_tracer), Subchannel)); + AddTraceEventHelper(New<TraceEvent>(severity, data, + std::move(referenced_subchannel), + ReferencedType::Subchannel)); } namespace { @@ -193,22 +189,24 @@ void ChannelTrace::TraceEvent::RenderTraceEvent(grpc_json* json) const { json_iterator = grpc_json_create_child(json_iterator, json, "timestamp", fmt_time(timestamp_), GRPC_JSON_STRING, true); - if (referenced_tracer_ != nullptr) { + if (referenced_channel_ != nullptr) { char* uuid_str; - gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_tracer_->channel_uuid_); + gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_channel_->channel_uuid()); grpc_json* child_ref = grpc_json_create_child( json_iterator, json, - (referenced_type_ == Channel) ? "channelRef" : "subchannelRef", nullptr, - GRPC_JSON_OBJECT, false); + (referenced_type_ == ReferencedType::Channel) ? "channelRef" + : "subchannelRef", + nullptr, GRPC_JSON_OBJECT, false); json_iterator = grpc_json_create_child( nullptr, child_ref, - (referenced_type_ == Channel) ? "channelId" : "subchannelId", uuid_str, - GRPC_JSON_STRING, true); + (referenced_type_ == ReferencedType::Channel) ? "channelId" + : "subchannelId", + uuid_str, GRPC_JSON_STRING, true); json_iterator = child_ref; } } -char* ChannelTrace::RenderTrace() const { +grpc_json* ChannelTrace::RenderJSON() const { if (!max_list_size_) return nullptr; // tracing is disabled if max_events == 0 grpc_json* json = grpc_json_create(GRPC_JSON_OBJECT); @@ -219,7 +217,7 @@ char* ChannelTrace::RenderTrace() const { grpc_json_create_child(json_iterator, json, "numEventsLogged", num_events_logged_str, GRPC_JSON_STRING, true); json_iterator = - grpc_json_create_child(json_iterator, json, "creationTime", + grpc_json_create_child(json_iterator, json, "creationTimestamp", fmt_time(time_created_), GRPC_JSON_STRING, true); grpc_json* events = grpc_json_create_child(json_iterator, json, "events", nullptr, GRPC_JSON_ARRAY, false); @@ -231,9 +229,8 @@ char* ChannelTrace::RenderTrace() const { it->RenderTraceEvent(json_iterator); it = it->next(); } - char* json_str = grpc_json_dump_to_string(json, 0); - grpc_json_destroy(json); - return json_str; + return json; } +} // namespace channelz } // namespace grpc_core diff --git a/src/core/lib/channel/channel_trace.h b/src/core/lib/channel/channel_trace.h index 1df1e585f2..0dd162a777 100644 --- a/src/core/lib/channel/channel_trace.h +++ b/src/core/lib/channel/channel_trace.h @@ -28,18 +28,18 @@ #include "src/core/lib/json/json.h" namespace grpc_core { +namespace channelz { + +class ChannelNode; // Object used to hold live data for a channel. This data is exposed via the // channelz service: // https://github.com/grpc/proposal/blob/master/A14-channelz.md -class ChannelTrace : public RefCounted<ChannelTrace> { +class ChannelTrace { public: ChannelTrace(size_t max_events); ~ChannelTrace(); - // returns the tracer's uuid - intptr_t GetUuid() const; - enum Severity { Unset = 0, // never to be used Info, // we start at 1 to avoid using proto default values @@ -59,34 +59,30 @@ class ChannelTrace : public RefCounted<ChannelTrace> { // created a new subchannel, then it would record that with a TraceEvent // referencing the new subchannel. // - // TODO(ncteisen): Once channelz is implemented, the events should reference - // the overall channelz object, not just the ChannelTrace object. // TODO(ncteisen): as this call is used more and more throughout the gRPC // stack, determine if it makes more sense to accept a char* instead of a // slice. void AddTraceEventReferencingChannel( Severity severity, grpc_slice data, - RefCountedPtr<ChannelTrace> referenced_tracer); + RefCountedPtr<ChannelNode> referenced_channel); void AddTraceEventReferencingSubchannel( Severity severity, grpc_slice data, - RefCountedPtr<ChannelTrace> referenced_tracer); + RefCountedPtr<ChannelNode> referenced_subchannel); - // Returns the tracing data rendered as a grpc json string. - // The string is owned by the caller and must be freed. - char* RenderTrace() const; + // Creates and returns the raw grpc_json object, so a parent channelz + // object may incorporate the json before rendering. + grpc_json* RenderJSON() const; private: // Types of objects that can be references by trace events. - enum ReferencedType { Channel, Subchannel }; + enum class ReferencedType { Channel, Subchannel }; // Private class to encapsulate all the data and bookkeeping needed for a // a trace event. class TraceEvent { public: // Constructor for a TraceEvent that references a different channel. - // TODO(ncteisen): once channelz is implemented, this should reference the - // overall channelz object, not just the ChannelTrace object TraceEvent(Severity severity, grpc_slice data, - RefCountedPtr<ChannelTrace> referenced_tracer, + RefCountedPtr<ChannelNode> referenced_channel, ReferencedType type); // Constructor for a TraceEvent that does not reverence a different @@ -109,7 +105,7 @@ class ChannelTrace : public RefCounted<ChannelTrace> { gpr_timespec timestamp_; TraceEvent* next_; // the tracer object for the (sub)channel that this trace event refers to. - RefCountedPtr<ChannelTrace> referenced_tracer_; + RefCountedPtr<ChannelNode> referenced_channel_; // the type that the referenced tracer points to. Unused if this trace // does not point to any channel or subchannel ReferencedType referenced_type_; @@ -119,7 +115,6 @@ class ChannelTrace : public RefCounted<ChannelTrace> { void AddTraceEventHelper(TraceEvent* new_trace_event); gpr_mu tracer_mu_; - intptr_t channel_uuid_; uint64_t num_events_logged_; size_t list_size_; size_t max_list_size_; @@ -128,6 +123,7 @@ class ChannelTrace : public RefCounted<ChannelTrace> { gpr_timespec time_created_; }; +} // namespace channelz } // namespace grpc_core #endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_TRACE_H */ diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc new file mode 100644 index 0000000000..3550fc0551 --- /dev/null +++ b/src/core/lib/channel/channelz.cc @@ -0,0 +1,185 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/impl/codegen/port_platform.h> + +#include "src/core/lib/channel/channelz.h" + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "src/core/lib/channel/channelz_registry.h" +#include "src/core/lib/channel/status_util.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/error_utils.h" + +namespace grpc_core { +namespace channelz { + +namespace { + +// TODO(ncteisen): move this function to a common helper location. +// +// returns an allocated string that represents tm according to RFC-3339, and, +// more specifically, follows: +// https://developers.google.com/protocol-buffers/docs/proto3#json +// +// "Uses RFC 3339, where generated output will always be Z-normalized and uses +// 0, 3, 6 or 9 fractional digits." +char* fmt_time(gpr_timespec tm) { + char time_buffer[35]; + char ns_buffer[11]; // '.' + 9 digits of precision + struct tm* tm_info = localtime((const time_t*)&tm.tv_sec); + strftime(time_buffer, sizeof(time_buffer), "%Y-%m-%dT%H:%M:%S", tm_info); + snprintf(ns_buffer, 11, ".%09d", tm.tv_nsec); + // This loop trims off trailing zeros by inserting a null character that the + // right point. We iterate in chunks of three because we want 0, 3, 6, or 9 + // fractional digits. + for (int i = 7; i >= 1; i -= 3) { + if (ns_buffer[i] == '0' && ns_buffer[i + 1] == '0' && + ns_buffer[i + 2] == '0') { + ns_buffer[i] = '\0'; + // Edge case in which all fractional digits were 0. + if (i == 1) { + ns_buffer[0] = '\0'; + } + } else { + break; + } + } + char* full_time_str; + gpr_asprintf(&full_time_str, "%s%sZ", time_buffer, ns_buffer); + return full_time_str; +} + +// TODO(ncteisen); move this to json library +grpc_json* add_num_str(grpc_json* parent, grpc_json* it, const char* name, + int64_t num) { + char* num_str; + gpr_asprintf(&num_str, "%" PRId64, num); + return grpc_json_create_child(it, parent, name, num_str, GRPC_JSON_STRING, + true); +} + +} // namespace + +ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes) + : channel_(channel), target_(nullptr), channel_uuid_(-1) { + trace_.Init(channel_tracer_max_nodes); + target_ = UniquePtr<char>(grpc_channel_get_target(channel_)); + channel_uuid_ = ChannelzRegistry::Register(this); + gpr_atm_no_barrier_store(&last_call_started_millis_, + (gpr_atm)ExecCtx::Get()->Now()); +} + +ChannelNode::~ChannelNode() { + trace_.Destroy(); + ChannelzRegistry::Unregister(channel_uuid_); +} + +void ChannelNode::RecordCallStarted() { + gpr_atm_no_barrier_fetch_add(&calls_started_, (gpr_atm)1); + gpr_atm_no_barrier_store(&last_call_started_millis_, + (gpr_atm)ExecCtx::Get()->Now()); +} + +grpc_connectivity_state ChannelNode::GetConnectivityState() { + if (channel_ == nullptr) { + return GRPC_CHANNEL_SHUTDOWN; + } else { + return grpc_channel_check_connectivity_state(channel_, false); + } +} + +char* ChannelNode::RenderJSON() { + // We need to track these three json objects to build our object + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* json_iterator = nullptr; + // create and fill the ref child + json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr, + GRPC_JSON_OBJECT, false); + json = json_iterator; + json_iterator = nullptr; + json_iterator = add_num_str(json, json_iterator, "channelId", channel_uuid_); + // reset json iterators to top level object + json = top_level_json; + json_iterator = nullptr; + // create and fill the data child. + grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr, + GRPC_JSON_OBJECT, false); + json = data; + json_iterator = nullptr; + // create and fill the connectivity state child. + grpc_connectivity_state connectivity_state = GetConnectivityState(); + json_iterator = grpc_json_create_child(json_iterator, json, "state", nullptr, + GRPC_JSON_OBJECT, false); + json = json_iterator; + grpc_json_create_child(nullptr, json, "state", + grpc_connectivity_state_name(connectivity_state), + GRPC_JSON_STRING, false); + // reset the parent to be the data object. + json = data; + json_iterator = grpc_json_create_child( + json_iterator, json, "target", target_.get(), GRPC_JSON_STRING, false); + // fill in the channel trace if applicable + grpc_json* trace = trace_->RenderJSON(); + if (trace != nullptr) { + // we manuall link up and fill the child since it was created for us in + // ChannelTrace::RenderJSON + json_iterator = grpc_json_link_child(json, trace, json_iterator); + trace->parent = json; + trace->value = nullptr; + trace->key = "trace"; + trace->owns_value = false; + } + // reset the parent to be the data object. + json = data; + json_iterator = nullptr; + // We use -1 as sentinel values since proto default value for integers is + // zero, and the confuses the parser into thinking the value weren't present + json_iterator = + add_num_str(json, json_iterator, "callsStarted", calls_started_); + json_iterator = + add_num_str(json, json_iterator, "callsSucceeded", calls_succeeded_); + json_iterator = + add_num_str(json, json_iterator, "callsFailed", calls_failed_); + gpr_timespec ts = + grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME); + json_iterator = + grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp", + fmt_time(ts), GRPC_JSON_STRING, true); + // render and return the over json object + char* json_str = grpc_json_dump_to_string(top_level_json, 0); + grpc_json_destroy(top_level_json); + return json_str; +} + +} // namespace channelz +} // namespace grpc_core diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h new file mode 100644 index 0000000000..2aad1e82f4 --- /dev/null +++ b/src/core/lib/channel/channelz.h @@ -0,0 +1,85 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_CHANNELZ_H +#define GRPC_CORE_LIB_CHANNEL_CHANNELZ_H + +#include <grpc/impl/codegen/port_platform.h> + +#include <grpc/grpc.h> + +#include "src/core/lib/channel/channel_trace.h" +#include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/json/json.h" + +namespace grpc_core { +namespace channelz { + +namespace testing { +class ChannelNodePeer; +} + +class ChannelNode : public RefCounted<ChannelNode> { + public: + ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes); + ~ChannelNode(); + + void RecordCallStarted(); + void RecordCallFailed() { + gpr_atm_no_barrier_fetch_add(&calls_failed_, (gpr_atm(1))); + } + void RecordCallSucceeded() { + gpr_atm_no_barrier_fetch_add(&calls_succeeded_, (gpr_atm(1))); + } + + char* RenderJSON(); + + ChannelTrace* trace() { return trace_.get(); } + + void set_channel_destroyed() { + GPR_ASSERT(channel_ != nullptr); + channel_ = nullptr; + } + + intptr_t channel_uuid() { return channel_uuid_; } + + private: + // testing peer friend. + friend class testing::ChannelNodePeer; + + // helper for getting connectivity state. + grpc_connectivity_state GetConnectivityState(); + + grpc_channel* channel_ = nullptr; + UniquePtr<char> target_; + gpr_atm calls_started_ = 0; + gpr_atm calls_succeeded_ = 0; + gpr_atm calls_failed_ = 0; + gpr_atm last_call_started_millis_ = 0; + intptr_t channel_uuid_; + ManualConstructor<ChannelTrace> trace_; +}; + +} // namespace channelz +} // namespace grpc_core + +#endif /* GRPC_CORE_LIB_CHANNEL_CHANNELZ_H */ diff --git a/src/core/lib/iomgr/socket_utils.h b/src/core/lib/iomgr/socket_utils.h index cf1a7be648..14bb081e93 100644 --- a/src/core/lib/iomgr/socket_utils.h +++ b/src/core/lib/iomgr/socket_utils.h @@ -23,12 +23,21 @@ #include <stddef.h> +// TODO(juanlishen): The following functions might be simple enough to implement +// ourselves, so that they don't cause any portability hassle. + /* A wrapper for htons on POSIX and Windows */ uint16_t grpc_htons(uint16_t hostshort); /* A wrapper for ntohs on POSIX and WINDOWS */ uint16_t grpc_ntohs(uint16_t netshort); +/* A wrapper for htonl on POSIX and Windows */ +uint32_t grpc_htonl(uint32_t hostlong); + +/* A wrapper for ntohl on POSIX and WINDOWS */ +uint32_t grpc_ntohl(uint32_t netlong); + /* A wrapper for inet_pton on POSIX and WINDOWS */ int grpc_inet_pton(int af, const char* src, void* dst); diff --git a/src/core/lib/iomgr/socket_utils_common_posix.cc b/src/core/lib/iomgr/socket_utils_common_posix.cc index caee652307..c4b991c94d 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.cc +++ b/src/core/lib/iomgr/socket_utils_common_posix.cc @@ -339,6 +339,10 @@ uint16_t grpc_htons(uint16_t hostshort) { return htons(hostshort); } uint16_t grpc_ntohs(uint16_t netshort) { return ntohs(netshort); } +uint32_t grpc_htonl(uint32_t hostlong) { return htonl(hostlong); } + +uint32_t grpc_ntohl(uint32_t netlong) { return ntohl(netlong); } + int grpc_inet_pton(int af, const char* src, void* dst) { return inet_pton(af, src, dst); } diff --git a/src/core/lib/iomgr/socket_utils_uv.cc b/src/core/lib/iomgr/socket_utils_uv.cc index 7eba40c46b..b5f96b52df 100644 --- a/src/core/lib/iomgr/socket_utils_uv.cc +++ b/src/core/lib/iomgr/socket_utils_uv.cc @@ -33,6 +33,10 @@ uint16_t grpc_htons(uint16_t hostshort) { return htons(hostshort); } uint16_t grpc_ntohs(uint16_t netshort) { return ntohs(netshort); } +uint32_t grpc_htonl(uint32_t hostlong) { return htonl(hostlong); } + +uint32_t grpc_ntohl(uint32_t netlong) { return ntohl(netlong); } + int grpc_inet_pton(int af, const char* src, void* dst) { return inet_pton(af, src, dst); } diff --git a/src/core/lib/iomgr/socket_utils_windows.cc b/src/core/lib/iomgr/socket_utils_windows.cc index 3e7b5b812d..9137ab98e6 100644 --- a/src/core/lib/iomgr/socket_utils_windows.cc +++ b/src/core/lib/iomgr/socket_utils_windows.cc @@ -31,6 +31,10 @@ uint16_t grpc_htons(uint16_t hostshort) { return htons(hostshort); } uint16_t grpc_ntohs(uint16_t netshort) { return ntohs(netshort); } +uint32_t grpc_htonl(uint32_t hostlong) { return htonl(hostlong); } + +uint32_t grpc_ntohl(uint32_t netlong) { return ntohl(netlong); } + int grpc_inet_pton(int af, const char* src, void* dst) { return inet_pton(af, src, dst); } diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 135a128aae..556eb234b4 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -489,6 +489,12 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, &call->pollent); } + grpc_core::channelz::ChannelNode* channelz_channel = + grpc_channel_get_channelz_node(call->channel); + if (channelz_channel != nullptr) { + channelz_channel->RecordCallStarted(); + } + grpc_slice_unref_internal(path); return error; @@ -531,7 +537,6 @@ static void release_call(void* call, grpc_error* error) { GRPC_CHANNEL_INTERNAL_UNREF(channel, "call"); } -static void set_status_value_directly(grpc_status_code status, void* dest); static void destroy_call(void* call, grpc_error* error) { GPR_TIMER_SCOPE("destroy_call", 0); size_t i; @@ -1087,13 +1092,12 @@ static void recv_trailing_filter(void* args, grpc_metadata_batch* b) { if (b->idx.named.grpc_status != nullptr) { grpc_status_code status_code = grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md); - grpc_error* error = - status_code == GRPC_STATUS_OK - ? GRPC_ERROR_NONE - : grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Error received from peer"), - GRPC_ERROR_INT_GRPC_STATUS, - static_cast<intptr_t>(status_code)); + grpc_error* error = GRPC_ERROR_NONE; + if (status_code != GRPC_STATUS_OK) { + error = grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error received from peer"), + GRPC_ERROR_INT_GRPC_STATUS, static_cast<intptr_t>(status_code)); + } if (b->idx.named.grpc_message != nullptr) { error = grpc_error_set_str( error, GRPC_ERROR_STR_GRPC_MESSAGE, @@ -1260,6 +1264,15 @@ static void post_batch_completion(batch_control* bctl) { get_final_status(call, set_cancelled_value, call->final_op.server.cancelled, nullptr, nullptr); } + grpc_core::channelz::ChannelNode* channelz_channel = + grpc_channel_get_channelz_node(call->channel); + if (channelz_channel != nullptr) { + if (*call->final_op.client.status != GRPC_STATUS_OK) { + channelz_channel->RecordCallFailed(); + } else { + channelz_channel->RecordCallSucceeded(); + } + } GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index a466b325be..d5d75fcb2a 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -32,6 +32,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_trace.h" +#include "src/core/lib/channel/channelz.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/manual_constructor.h" @@ -66,7 +67,7 @@ struct grpc_channel { gpr_mu registered_call_mu; registered_call* registered_calls; - grpc_core::RefCountedPtr<grpc_core::ChannelTrace> tracer; + grpc_core::RefCountedPtr<grpc_core::channelz::ChannelNode> channelz_channel; char* target; }; @@ -103,6 +104,7 @@ grpc_channel* grpc_channel_create_with_builder( channel->target = target; channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type); size_t channel_tracer_max_nodes = 0; // default to off + bool channelz_enabled = false; gpr_mu_init(&channel->registered_call_mu); channel->registered_calls = nullptr; @@ -141,15 +143,20 @@ grpc_channel* grpc_channel_create_with_builder( const grpc_integer_options options = {0, 0, INT_MAX}; channel_tracer_max_nodes = (size_t)grpc_channel_arg_get_integer(&args->args[i], options); + } else if (0 == strcmp(args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) { + channelz_enabled = grpc_channel_arg_get_bool(&args->args[i], false); } } grpc_channel_args_destroy(args); - channel->tracer = grpc_core::MakeRefCounted<grpc_core::ChannelTrace>( - channel_tracer_max_nodes); - channel->tracer->AddTraceEvent( - grpc_core::ChannelTrace::Severity::Info, - grpc_slice_from_static_string("Channel created")); + if (channelz_enabled) { + channel->channelz_channel = + grpc_core::MakeRefCounted<grpc_core::channelz::ChannelNode>( + channel, channel_tracer_max_nodes); + channel->channelz_channel->trace()->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Channel created")); + } return channel; } @@ -184,12 +191,9 @@ static grpc_channel_args* build_channel_args( return grpc_channel_args_copy_and_add(input_args, new_args, num_new_args); } -char* grpc_channel_get_trace(grpc_channel* channel) { - return channel->tracer->RenderTrace(); -} - -intptr_t grpc_channel_get_uuid(grpc_channel* channel) { - return channel->tracer->GetUuid(); +grpc_core::channelz::ChannelNode* grpc_channel_get_channelz_node( + grpc_channel* channel) { + return channel->channelz_channel.get(); } grpc_channel* grpc_channel_create(const char* target, @@ -395,6 +399,10 @@ void grpc_channel_internal_unref(grpc_channel* c REF_ARG) { static void destroy_channel(void* arg, grpc_error* error) { grpc_channel* channel = static_cast<grpc_channel*>(arg); + if (channel->channelz_channel != nullptr) { + channel->channelz_channel->set_channel_destroyed(); + channel->channelz_channel.reset(); + } grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel)); while (channel->registered_calls) { registered_call* rc = channel->registered_calls; @@ -403,7 +411,6 @@ static void destroy_channel(void* arg, grpc_error* error) { GRPC_MDELEM_UNREF(rc->authority); gpr_free(rc); } - channel->tracer.reset(); gpr_mu_destroy(&channel->registered_call_mu); gpr_free(channel->target); gpr_free(channel); diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index 288313951e..e5ff2c3596 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -23,6 +23,7 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channel_stack_builder.h" +#include "src/core/lib/channel/channelz.h" #include "src/core/lib/surface/channel_stack_type.h" grpc_channel* grpc_channel_create(const char* target, @@ -50,6 +51,9 @@ grpc_call* grpc_channel_create_pollset_set_call( /** Get a (borrowed) pointer to this channels underlying channel stack */ grpc_channel_stack* grpc_channel_get_channel_stack(grpc_channel* channel); +grpc_core::channelz::ChannelNode* grpc_channel_get_channelz_node( + grpc_channel* channel); + /** Get a grpc_mdelem of grpc-status: X where X is the numeric value of status_code. diff --git a/src/cpp/README.md b/src/cpp/README.md index ac76bf74ed..f16cab1f44 100644 --- a/src/cpp/README.md +++ b/src/cpp/README.md @@ -1,13 +1,62 @@ # Overview -This directory contains source code for C++ implementation of gRPC. +A C++ implementation of gRPC -To install gRPC for C++ on your system, follow the instructions to build from source -[here](../../INSTALL.md). This also installs the protocol buffer compiler -`protoc` (if you don't have it already), and the C++ gRPC plugin for `protoc`. +# To start using gRPC C++ -# Documentation +In the C++ world, there's no universally accepted standard for managing project dependencies. +Therefore, gRPC supports several major build systems, which should satisfy most users. + +## bazel + +We recommend using Bazel for projects that use gRPC as it will give you the best developer experience +(easy handling of dependencies that support bazel & fast builds). + +To add gRPC as a dependency in bazel: +1. determine commit SHA for the grpc release you want to use +2. Use the [http_archive](https://docs.bazel.build/versions/master/be/workspace.html#http_archive) bazel rule to include gRPC source + ``` + http_archive( + name = "grpc", + urls = [ + "https://github.com/grpc/grpc/archive/YOUR_GRPC_COMMIT_SHA.tar.gz", + ], + strip_prefix = "grpc-YOUR_GRPC_COMMIT_SHA", + ) + ``` + +NOTE: currently bazel is only supported for building gRPC on Linux. + +## make + +Currently the default choice for building on UNIX based systems is `make`. + +To install gRPC for C++ on your system using `make`, follow the [Building gRPC C++](../../BUILDING.md) +instructions to build from source and then install locally using `make install`. +This also installs the protocol buffer compiler `protoc` (if you don't have it already), +and the C++ gRPC plugin for `protoc`. + +WARNING: After installing with `make install` there is no easy way to uninstall, which can cause issues +if you later want to remove the grpc and/or protobuf installation or upgrade to a newer version. + +## cmake + +`cmake` is the default build option on Windows, but also works on Linux, MacOS. `cmake` has good +support for crosscompiling and can be used for targeting Android platform. + +If your project is using cmake, there are several ways to add gRPC dependency. +- install gRPC via cmake first and then locate it with `find_package(gRPC CONFIG)`. [Example](../../examples/cpp/helloworld/CMakeLists.txt) +- via cmake's `ExternalProject_Add` using a technique called "superbuild". [Example](../../examples/cpp/helloworld/cmake_externalproject/CMakeLists.txt) +- add gRPC source tree to your project (preferrably as a git submodule) and add it to your cmake project with `add_subdirectory`. [Example](../../examples/cpp/helloworld/CMakeLists.txt) + +## Packaging systems + +There's no standard packaging system for C++. We've looked into supporting some (e.g. Conan and vcpkg) but we are not there yet. +Contributions and community-maintained packages for popular packaging systems are welcome! + + +## Examples & Additional Documentation You can find out how to build and run our simplest gRPC C++ example in our [C++ quick start](../../examples/cpp). @@ -25,7 +74,6 @@ documentation site at [grpc.io](https://grpc.io), specifically: APIs. -# Examples +# To start developing gRPC C++ -Code examples for gRPC C++ live in this repository's -[examples/cpp](../../examples/cpp) directory. +For instructions on how to build gRPC C++ from source, follow the [Building gRPC C++](../../BUILDING.md) instructions. diff --git a/src/cpp/ext/filters/census/client_filter.cc b/src/cpp/ext/filters/census/client_filter.cc index 293f4b1c25..940d42d100 100644 --- a/src/cpp/ext/filters/census/client_filter.cc +++ b/src/cpp/ext/filters/census/client_filter.cc @@ -119,8 +119,10 @@ void CensusClientCallData::StartTransportStreamOpBatch( } if (op->recv_trailing_metadata() != nullptr) { recv_trailing_metadata_ = op->recv_trailing_metadata()->batch(); - initial_on_done_recv_trailing_metadata_ = op->on_complete(); - op->set_on_complete(&on_done_recv_trailing_metadata_); + initial_on_done_recv_trailing_metadata_ = + op->op()->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + op->op()->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &on_done_recv_trailing_metadata_; } // Call next op. grpc_call_next_op(elem, op->op()); diff --git a/src/csharp/README.md b/src/csharp/README.md index e117e66afd..92be4bf1e5 100644 --- a/src/csharp/README.md +++ b/src/csharp/README.md @@ -42,7 +42,7 @@ If you are a user of gRPC C#, go to Usage section above. - [dotnet SDK](https://www.microsoft.com/net/core) - [Mono 4+](https://www.mono-project.com/) (only needed for Linux and MacOS) -- Prerequisites mentioned in [INSTALL.md](../../INSTALL.md#pre-requisites) +- Prerequisites mentioned in [BUILDING.md](../../BUILDING.md#pre-requisites) to be able to compile the native code. **Windows, Linux or Mac OS X** @@ -93,6 +93,6 @@ THE NATIVE DEPENDENCY Internally, gRPC C# uses a native library written in C (gRPC C core) and invokes its functionality via P/Invoke. The fact that a native library is used should be fully transparent to the users and just installing the `Grpc.Core` NuGet package is the only step needed to use gRPC C# on all supported platforms. -[API Reference]: https://grpc.io/grpc/csharp/ +[API Reference]: https://grpc.io/grpc/csharp/api/Grpc.Core.html [Helloworld Example]: ../../examples/csharp/helloworld [RouteGuide Tutorial]: https://grpc.io/docs/tutorials/basic/csharp.html diff --git a/src/proto/grpc/channelz/channelz.proto b/src/proto/grpc/channelz/channelz.proto index 14db66a654..d930dfcfb4 100644 --- a/src/proto/grpc/channelz/channelz.proto +++ b/src/proto/grpc/channelz/channelz.proto @@ -1,4 +1,4 @@ -// Copyright 2018 gRPC authors. +// Copyright 2018 The gRPC Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,20 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. +// This file defines an interface for exporting monitoring information +// out of gRPC servers. See the full design at +// https://github.com/grpc/proposal/blob/master/A14-channelz.md +// +// The canonical version of this proto can be found at +// https://github.com/grpc/grpc-proto/blob/master/grpc/channelz/v1/channelz.proto + syntax = "proto3"; -package grpc.channelz; +package grpc.channelz.v1; import "google/protobuf/any.proto"; import "google/protobuf/duration.proto"; import "google/protobuf/timestamp.proto"; import "google/protobuf/wrappers.proto"; -// See go/grpc-channelz. +option go_package = "google.golang.org/grpc/channelz/grpc_channelz_v1"; +option java_multiple_files = true; +option java_package = "io.grpc.channelz.v1"; +option java_outer_classname = "ChannelzProto"; // Channel is a logical grouping of channels, subchannels, and sockets. message Channel { - // The identifier for this channel. + // The identifier for this channel. This should bet set. ChannelRef ref = 1; // Data specific to this channel. ChannelData data = 2; @@ -43,7 +53,7 @@ message Channel { repeated SubchannelRef subchannel_ref = 4; // There are no ordering guarantees on the order of sockets. - repeated SocketRef socket = 5; + repeated SocketRef socket_ref = 5; } // Subchannel is a logical grouping of channels, subchannels, and sockets. @@ -67,7 +77,7 @@ message Subchannel { repeated SubchannelRef subchannel_ref = 4; // There are no ordering guarantees on the order of sockets. - repeated SocketRef socket = 5; + repeated SocketRef socket_ref = 5; } // These come from the specified states in this document: @@ -84,20 +94,23 @@ message ChannelConnectivityState { State state = 1; } +// Channel data is data related to a specific Channel or Subchannel. message ChannelData { - + // The connectivity state of the channel or subchannel. Implementations + // should always set this. ChannelConnectivityState state = 1; // The target this channel originally tried to connect to. May be absent string target = 2; + // A trace of recent events on the channel. May be absent. ChannelTrace trace = 3; // The number of calls started on the channel int64 calls_started = 4; // The number of calls that have completed with an OK status int64 calls_succeeded = 5; - // The number of calls that have a completed with a non-OK status + // The number of calls that have completed with a non-OK status int64 calls_failed = 6; // The last time a call was started on the channel. @@ -130,26 +143,29 @@ message ChannelTraceEvent { } } +// ChannelTrace represents the recent events that have occurred on the channel. message ChannelTrace { // Number of events ever logged in this tracing object. This can differ from // events.size() because events can be overwritten or garbage collected by // implementations. int64 num_events_logged = 1; // Time that this channel was created. - google.protobuf.Timestamp creation_time = 2; + google.protobuf.Timestamp creation_timestamp = 2; // List of events that have occurred on this channel. repeated ChannelTraceEvent events = 3; } +// ChannelRef is a reference to a Channel. message ChannelRef { // The globally unique id for this channel. Must be a positive number. int64 channel_id = 1; // An optional name associated with the channel. string name = 2; // Intentionally don't use field numbers from other refs. - reserved 3, 4, 5, 6; + reserved 3, 4, 5, 6, 7, 8; } +// ChannelRef is a reference to a Subchannel. message SubchannelRef { // The globally unique id for this subchannel. Must be a positive number. int64 subchannel_id = 7; @@ -159,6 +175,7 @@ message SubchannelRef { reserved 1, 2, 3, 4, 5, 6; } +// SocketRef is a reference to a Socket. message SocketRef { int64 socket_id = 3; // An optional name associated with the socket. @@ -167,8 +184,9 @@ message SocketRef { reserved 1, 2, 5, 6, 7, 8; } +// ServerRef is a reference to a Server. message ServerRef { - // A globally unique identifier for this server. Must be a positive number. + // A globally unique identifier for this server. Must be a positive number. int64 server_id = 5; // An optional name associated with the server. string name = 6; @@ -176,16 +194,22 @@ message ServerRef { reserved 1, 2, 3, 4, 7, 8; } +// Server represents a single server. There may be multiple servers in a single +// program. message Server { + // The identifier for a Server. This should be set. ServerRef ref = 1; + // The associated data of the Server. ServerData data = 2; // The sockets that the server is listening on. There are no ordering - // guarantees. + // guarantees. This may be absent. repeated SocketRef listen_socket = 3; } +// ServerData is data for a specific Server. message ServerData { + // A trace of recent events on the server. May be absent. ChannelTrace trace = 1; // The number of incoming calls started on the server @@ -201,13 +225,17 @@ message ServerData { // Information about an actual connection. Pronounced "sock-ay". message Socket { + // The identifier for the Socket. SocketRef ref = 1; + // Data specific to this Socket. SocketData data = 2; // The locally bound address. Address local = 3; // The remote bound address. May be absent. Address remote = 4; + // Security details for this socket. May be absent if not available, or + // there is no security on the socket. Security security = 5; // Optional, represents the name of the remote endpoint, if different than @@ -215,17 +243,23 @@ message Socket { string remote_name = 6; } +// SocketData is data associated for a specific Socket. The fields present +// are specific to the implementation, so there may be minor differences in +// the semantics. (e.g. flow control windows) message SocketData { // The number of streams that have been started. int64 streams_started = 1; - // The number of streams that have ended successfully with the EoS bit set for - // both end points + // The number of streams that have ended successfully: + // On client side, received frame with eos bit set; + // On server side, sent frame with eos bit set. int64 streams_succeeded = 2; - // The number of incoming streams that have a completed with a non-OK status + // The number of streams that have ended unsuccessfully: + // On client side, ended without receiving frame with eos bit set; + // On server side, ended without sending frame with eos bit set. int64 streams_failed = 3; - - // The number of messages successfully sent on this socket. + // The number of grpc messages successfully sent on this socket. int64 messages_sent = 4; + // The number of grpc messages received on this socket. int64 messages_received = 5; // The number of keep alives sent. This is typically implemented with HTTP/2 @@ -254,12 +288,14 @@ message SocketData { // include stream level or TCP level flow control info. google.protobuf.Int64Value remote_flow_control_window = 12; + // Socket options set on this socket. May be absent. repeated SocketOption option = 13; } +// Address represents the address used to create the socket. message Address { message TcpIpAddress { - // Either the IPv4 or IPv6 address in bytes. Will either be 4 bytes or 16 + // Either the IPv4 or IPv6 address in bytes. Will be either 4 bytes or 16 // bytes in length. bytes ip_address = 1; // 0-64k, or -1 if not appropriate. @@ -271,7 +307,7 @@ message Address { } // An address type not included above. message OtherAddress { - // The human readable version of the value. + // The human readable version of the value. This value should be set. string name = 1; // The actual address message. google.protobuf.Any value = 2; @@ -284,12 +320,17 @@ message Address { } } +// Security represents details about how secure the socket is. message Security { message Tls { - // The key exchange used. e.g. X25519 - string key_exchange = 1; - // The cipher used. e.g. AES_128_GCM. - string cipher = 2; + oneof cipher_suite { + // The cipher suite name in the RFC 4346 format: + // https://tools.ietf.org/html/rfc4346#appendix-C + string standard_name = 1; + // Some other way to describe the cipher suite if + // the RFC 4346 name is not available. + string other_name = 2; + } // the certificate used by this endpoint. bytes local_certificate = 3; // the certificate used by the remote endpoint. @@ -307,7 +348,11 @@ message Security { } } +// SocketOption represents socket options for a socket. Specifically, these +// are the options returned by getsockopt(). message SocketOption { + // The full name of the socket option. Typically this will be the upper case + // name, such as "SO_REUSEPORT". string name = 1; // The human readable value of this socket option. At least one of value or // additional will be set. @@ -323,12 +368,17 @@ message SocketOptionTimeout { google.protobuf.Duration duration = 1; } +// For use with SocketOption's additional field. This is primarily used for +// SO_LINGER. message SocketOptionLinger { + // active maps to `struct linger.l_onoff` bool active = 1; + // duration maps to `struct linger.l_linger` google.protobuf.Duration duration = 2; } -// Tcp info for SOL_TCP, TCP_INFO +// For use with SocketOption's additional field. Tcp info for +// SOL_TCP and TCP_INFO. message SocketOptionTcpInfo { uint32 tcpi_state = 1; @@ -366,8 +416,10 @@ message SocketOptionTcpInfo { uint32 tcpi_reordering = 29; } +// Channelz is a service exposed by gRPC servers that provides detailed debug +// information. service Channelz { - // Gets all root channels (e.g. channels the application has directly + // Gets all root channels (i.e. channels the application has directly // created). This does not include subchannels nor non-top level channels. rpc GetTopChannels(GetTopChannelsRequest) returns (GetTopChannelsResponse); // Gets all servers that exist in the process. @@ -382,6 +434,22 @@ service Channelz { rpc GetSocket(GetSocketRequest) returns (GetSocketResponse); } +message GetTopChannelsRequest { + // start_channel_id indicates that only channels at or above this id should be + // included in the results. + int64 start_channel_id = 1; +} + +message GetTopChannelsResponse { + // list of channels that the connection detail service knows about. Sorted in + // ascending channel_id order. + repeated Channel channel = 1; + // If set, indicates that the list of channels is the final list. Requesting + // more channels can only return more if they are created after this RPC + // completes. + bool end = 2; +} + message GetServersRequest { // start_server_id indicates that only servers at or above this id should be // included in the results. @@ -415,42 +483,35 @@ message GetServerSocketsResponse { bool end = 2; } -message GetTopChannelsRequest { - // start_channel_id indicates that only channels at or above this id should be - // included in the results. - int64 start_channel_id = 1; -} - -message GetTopChannelsResponse { - // list of channels that the connection detail service knows about. Sorted in - // ascending channel_id order. - repeated Channel channel = 1; - // If set, indicates that the list of channels is the final list. Requesting - // more channels can only return more if they are created after this RPC - // completes. - bool end = 2; -} - message GetChannelRequest { + // channel_id is the identifier of the specific channel to get. int64 channel_id = 1; } message GetChannelResponse { + // The Channel that corresponds to the requested channel_id. This field + // should be set. Channel channel = 1; } message GetSubchannelRequest { + // subchannel_id is the identifier of the specific subchannel to get. int64 subchannel_id = 1; } message GetSubchannelResponse { + // The Subchannel that corresponds to the requested subchannel_id. This + // field should be set. Subchannel subchannel = 1; } message GetSocketRequest { + // socket_id is the identifier of the specific socket to get. int64 socket_id = 1; } message GetSocketResponse { + // The Socket that corresponds to the requested socket_id. This field + // should be set. Socket socket = 1; } diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index f53a6c9be2..b20b8155a0 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -64,6 +64,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/channel/channel_stack.cc', 'src/core/lib/channel/channel_stack_builder.cc', 'src/core/lib/channel/channel_trace.cc', + 'src/core/lib/channel/channelz.cc', 'src/core/lib/channel/channelz_registry.cc', 'src/core/lib/channel/connected_channel.cc', 'src/core/lib/channel/handshaker.cc', diff --git a/src/python/grpcio_testing/MANIFEST.in b/src/python/grpcio_testing/MANIFEST.in new file mode 100644 index 0000000000..39b3565217 --- /dev/null +++ b/src/python/grpcio_testing/MANIFEST.in @@ -0,0 +1,3 @@ +include grpc_version.py +recursive-include grpc_testing *.py +global-exclude *.pyc diff --git a/src/python/grpcio_testing/README.rst b/src/python/grpcio_testing/README.rst new file mode 100644 index 0000000000..c699b80fb6 --- /dev/null +++ b/src/python/grpcio_testing/README.rst @@ -0,0 +1,10 @@ +gRPC Python Testing Package +=========================== + +Testing utilities for gRPC Python + +Dependencies +------------ + +Depends on the `grpcio` package, available from PyPI via `pip install grpcio`. + diff --git a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb index 8ec2073d98..7b39f5a347 100755 --- a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb +++ b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb @@ -58,10 +58,6 @@ def main run_multiple_killed_watches(10, 0.1) STDERR.puts '1000 iterations, sleep 0.001 before killing thread' run_multiple_killed_watches(1000, 0.001) - STDERR.puts '10000 iterations, sleep 0.00001 before killing thread' - run_multiple_killed_watches(10_000, 0.00001) - STDERR.puts '20000 iterations, sleep 0.00001 before killing thread' - run_multiple_killed_watches(20_000, 0.00001) end main diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 02f84c0b96..031699ce8e 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -68,8 +68,6 @@ grpc_channel_get_info_type grpc_channel_get_info_import; grpc_insecure_channel_create_type grpc_insecure_channel_create_import; grpc_lame_client_channel_create_type grpc_lame_client_channel_create_import; grpc_channel_destroy_type grpc_channel_destroy_import; -grpc_channel_get_trace_type grpc_channel_get_trace_import; -grpc_channel_get_uuid_type grpc_channel_get_uuid_import; grpc_call_cancel_type grpc_call_cancel_import; grpc_call_cancel_with_status_type grpc_call_cancel_with_status_import; grpc_call_ref_type grpc_call_ref_import; @@ -316,8 +314,6 @@ void grpc_rb_load_imports(HMODULE library) { grpc_insecure_channel_create_import = (grpc_insecure_channel_create_type) GetProcAddress(library, "grpc_insecure_channel_create"); grpc_lame_client_channel_create_import = (grpc_lame_client_channel_create_type) GetProcAddress(library, "grpc_lame_client_channel_create"); grpc_channel_destroy_import = (grpc_channel_destroy_type) GetProcAddress(library, "grpc_channel_destroy"); - grpc_channel_get_trace_import = (grpc_channel_get_trace_type) GetProcAddress(library, "grpc_channel_get_trace"); - grpc_channel_get_uuid_import = (grpc_channel_get_uuid_type) GetProcAddress(library, "grpc_channel_get_uuid"); grpc_call_cancel_import = (grpc_call_cancel_type) GetProcAddress(library, "grpc_call_cancel"); grpc_call_cancel_with_status_import = (grpc_call_cancel_with_status_type) GetProcAddress(library, "grpc_call_cancel_with_status"); grpc_call_ref_import = (grpc_call_ref_type) GetProcAddress(library, "grpc_call_ref"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 46d3bf5a33..474405ae3f 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -179,12 +179,6 @@ extern grpc_lame_client_channel_create_type grpc_lame_client_channel_create_impo typedef void(*grpc_channel_destroy_type)(grpc_channel* channel); extern grpc_channel_destroy_type grpc_channel_destroy_import; #define grpc_channel_destroy grpc_channel_destroy_import -typedef char*(*grpc_channel_get_trace_type)(grpc_channel* channel); -extern grpc_channel_get_trace_type grpc_channel_get_trace_import; -#define grpc_channel_get_trace grpc_channel_get_trace_import -typedef intptr_t(*grpc_channel_get_uuid_type)(grpc_channel* channel); -extern grpc_channel_get_uuid_type grpc_channel_get_uuid_import; -#define grpc_channel_get_uuid grpc_channel_get_uuid_import typedef grpc_call_error(*grpc_call_cancel_type)(grpc_call* call, void* reserved); extern grpc_call_cancel_type grpc_call_cancel_import; #define grpc_call_cancel grpc_call_cancel_import diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 86240814bd..6e601bdc91 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -120,7 +120,7 @@ module GRPC @send_initial_md_mutex.synchronize do return if @metadata_sent @metadata_to_send.merge!(new_metadata) - @metadata_tag = ActiveCall.client_invoke(@call, @metadata_to_send) + ActiveCall.client_invoke(@call, @metadata_to_send) @metadata_sent = true end end |