diff options
Diffstat (limited to 'src')
33 files changed, 1060 insertions, 621 deletions
diff --git a/src/c-ares/gen_build_yaml.py b/src/c-ares/gen_build_yaml.py index 6f8b7485ac..4600d8d224 100755 --- a/src/c-ares/gen_build_yaml.py +++ b/src/c-ares/gen_build_yaml.py @@ -29,10 +29,14 @@ try: subprocess.call("third_party/cares/cares/configure", shell=True) def config_platform(x): - if 'linux' in sys.platform: - return 'src/cares/cares/config_linux/ares_config.h' if 'darwin' in sys.platform: return 'src/cares/cares/config_darwin/ares_config.h' + if 'freebsd' in sys.platform: + return 'src/cares/cares/config_freebsd/ares_config.h' + if 'linux' in sys.platform: + return 'src/cares/cares/config_linux/ares_config.h' + if 'openbsd' in sys.platform: + return 'src/cares/cares/config_openbsd/ares_config.h' if not os.path.isfile('third_party/cares/cares/ares_config.h'): gen_ares_build(x) return 'third_party/cares/cares/ares_config.h' @@ -124,8 +128,10 @@ try: "third_party/cares/cares/config-win32.h", "third_party/cares/cares/setup_once.h", "third_party/cares/ares_build.h", + "third_party/cares/config_darwin/ares_config.h", + "third_party/cares/config_freebsd/ares_config.h", "third_party/cares/config_linux/ares_config.h", - "third_party/cares/config_darwin/ares_config.h" + "third_party/cares/config_openbsd/ares_config.h" ], }] except: diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index e4b19a2c4a..9462d1085e 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -152,10 +152,14 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); +static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t); static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error); static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error); +static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx, + void *tp, grpc_error *error); static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); @@ -218,6 +222,9 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, t->write_cb_pool = next; } + t->flow_control.bdp_estimator.Destroy(); + + GRPC_ERROR_UNREF(t->closed_with_error); gpr_free(t->ping_acks); gpr_free(t->peer_string); gpr_free(t); @@ -303,6 +310,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, grpc_combiner_scheduler(t->combiner)); + GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked, + next_bdp_ping_timer_expired_locked, t, + grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked, t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, @@ -315,7 +325,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, keepalive_watchdog_fired_locked, t, grpc_combiner_scheduler(t->combiner)); - grpc_bdp_estimator_init(&t->flow_control.bdp_estimator, t->peer_string); + t->flow_control.bdp_estimator.Init(t->peer_string); grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser); @@ -562,6 +572,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; } + if (t->flow_control.enable_bdp_probe) { + GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); + schedule_bdp_ping_locked(exec_ctx, t); + } + grpc_chttp2_act_on_flowctl_action( exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t, @@ -595,7 +610,9 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { - if (!t->closed) { + end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); + cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error)); + if (t->closed_with_error == GRPC_ERROR_NONE) { if (!grpc_error_has_clear_grpc_status(error)) { error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); @@ -610,13 +627,16 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_error_add_child(t->close_transport_on_writes_finished, error); return; } - t->closed = 1; + GPR_ASSERT(error != GRPC_ERROR_NONE); + t->closed_with_error = GRPC_ERROR_REF(error); connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); - grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error)); if (t->ping_state.is_delayed_ping_timer_set) { grpc_timer_cancel(exec_ctx, &t->ping_state.delayed_ping_timer); } + if (t->have_next_bdp_ping_timer) { + grpc_timer_cancel(exec_ctx, &t->next_bdp_ping_timer); + } switch (t->keepalive_state) { case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); @@ -636,8 +656,8 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, while (grpc_chttp2_list_pop_writable_stream(t, &s)) { GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close"); } - end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); - cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error)); + GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE); + grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); } @@ -940,7 +960,8 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) { + if (t->closed_with_error == GRPC_ERROR_NONE && + grpc_chttp2_list_add_writable_stream(t, s)) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become"); } } @@ -993,7 +1014,7 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); grpc_chttp2_begin_write_result r; - if (t->closed) { + if (t->closed_with_error != GRPC_ERROR_NONE) { r.writing = false; } else { r = grpc_chttp2_begin_write(exec_ctx, t); @@ -1456,7 +1477,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (!s->write_closed) { if (t->is_client) { - if (!t->closed) { + if (t->closed_with_error == GRPC_ERROR_NONE) { GPR_ASSERT(s->id == 0); grpc_chttp2_list_add_waiting_for_concurrency(t, s); maybe_start_some_streams(exec_ctx, t); @@ -1464,7 +1485,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_chttp2_cancel_stream( exec_ctx, t, s, grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"), + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Transport closed", &t->closed_with_error, 1), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); } } else { @@ -1686,6 +1708,7 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, /* callback remaining pings: they're not allowed to call into the transpot, and maybe they hold resources that need to be freed */ grpc_chttp2_ping_queue *pq = &t->ping_queue; + GPR_ASSERT(error != GRPC_ERROR_NONE); for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[j]); @@ -1695,6 +1718,12 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure *on_initiate, grpc_closure *on_ack) { + if (t->closed_with_error != GRPC_ERROR_NONE) { + GRPC_CLOSURE_SCHED(exec_ctx, on_initiate, + GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_SCHED(exec_ctx, on_ack, GRPC_ERROR_REF(t->closed_with_error)); + return; + } grpc_chttp2_ping_queue *pq = &t->ping_queue; grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate, GRPC_ERROR_NONE); @@ -1753,7 +1782,9 @@ void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx, GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); /*The transport will be closed after the write is done */ close_transport_locked( - exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings")); + exec_ctx, t, grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); } } @@ -2432,12 +2463,6 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS); } } - if (action.need_ping) { - GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); - grpc_bdp_estimator_schedule_ping(&t->flow_control.bdp_estimator); - send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked, - &t->finish_bdp_ping_locked); - } } static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, @@ -2487,14 +2512,13 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, } GPR_SWAP(grpc_error *, err, error); GRPC_ERROR_UNREF(err); - if (!t->closed) { + if (t->closed_with_error == GRPC_ERROR_NONE) { GPR_TIMER_BEGIN("reading_action.parse", 0); size_t i = 0; grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, GRPC_ERROR_NONE}; for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { - grpc_bdp_estimator_add_incoming_bytes( - &t->flow_control.bdp_estimator, + t->flow_control.bdp_estimator->AddIncomingBytes( (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i])); errors[1] = grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]); @@ -2528,13 +2552,14 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_BEGIN("post_reading_action_locked", 0); bool keep_reading = false; - if (error == GRPC_ERROR_NONE && t->closed) { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"); + if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Transport closed", &t->closed_with_error, 1); } if (error != GRPC_ERROR_NONE) { close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); t->endpoint_reading = 0; - } else if (!t->closed) { + } else if (t->closed_with_error == GRPC_ERROR_NONE) { keep_reading = true; GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading"); } @@ -2559,28 +2584,57 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_END("reading_action_locked", 0); } +// t is reffed prior to calling the first time, and once the callback chain +// that kicks off finishes, it's unreffed +static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { + t->flow_control.bdp_estimator->SchedulePing(); + send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked, + &t->finish_bdp_ping_locked); +} + static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; if (GRPC_TRACER_ON(grpc_http_trace)) { - gpr_log(GPR_DEBUG, "%s: Start BDP ping", t->peer_string); + gpr_log(GPR_DEBUG, "%s: Start BDP ping err=%s", t->peer_string, + grpc_error_string(error)); } /* Reset the keepalive ping timer */ if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); } - grpc_bdp_estimator_start_ping(&t->flow_control.bdp_estimator); + t->flow_control.bdp_estimator->StartPing(); } static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; if (GRPC_TRACER_ON(grpc_http_trace)) { - gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string); + gpr_log(GPR_DEBUG, "%s: Complete BDP ping err=%s", t->peer_string, + grpc_error_string(error)); + } + if (error != GRPC_ERROR_NONE) { + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); + return; } - grpc_bdp_estimator_complete_ping(exec_ctx, &t->flow_control.bdp_estimator); + grpc_millis next_ping = t->flow_control.bdp_estimator->CompletePing(exec_ctx); + GPR_ASSERT(!t->have_next_bdp_ping_timer); + t->have_next_bdp_ping_timer = true; + grpc_timer_init(exec_ctx, &t->next_bdp_ping_timer, next_ping, + &t->next_bdp_ping_timer_expired_locked); +} - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); +static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx, + void *tp, grpc_error *error) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; + GPR_ASSERT(t->have_next_bdp_ping_timer); + t->have_next_bdp_ping_timer = false; + if (error != GRPC_ERROR_NONE) { + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); + return; + } + schedule_bdp_ping_locked(exec_ctx, t); } void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args, @@ -2645,7 +2699,7 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); - if (t->destroying || t->closed) { + if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; } else if (error == GRPC_ERROR_NONE) { if (t->keepalive_permit_without_calls || @@ -2703,8 +2757,11 @@ static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (error == GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; - close_transport_locked(exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "keepalive watchdog timeout")); + close_transport_locked( + exec_ctx, t, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "keepalive watchdog timeout"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL)); } } else { /* The watchdog timer should have been cancelled by diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc index 2428e2526d..d0e80c4bd5 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.cc +++ b/src/core/ext/transport/chttp2/transport/flow_control.cc @@ -459,12 +459,9 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( } } if (tfc->enable_bdp_probe) { - action.need_ping = - grpc_bdp_estimator_need_ping(exec_ctx, &tfc->bdp_estimator); - // get bdp estimate and update initial_window accordingly. int64_t estimate = -1; - if (grpc_bdp_estimator_get_estimate(&tfc->bdp_estimator, &estimate)) { + if (tfc->bdp_estimator->EstimateBdp(&estimate)) { double target = 1 + log2((double)estimate); // target might change based on how much memory pressure we are under @@ -491,7 +488,7 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( // get bandwidth estimate and update max_frame accordingly. double bw_dbl = -1; - if (grpc_bdp_estimator_get_bw(&tfc->bdp_estimator, &bw_dbl)) { + if (tfc->bdp_estimator->EstimateBandwidth(&bw_dbl)) { // we target the max of BDP or bandwidth in microseconds. int32_t frame_size = (int32_t)GPR_CLAMP( GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000, diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 5d27bb8d67..703f3ba348 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -37,6 +37,7 @@ #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/transport/bdp_estimator.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/pid_controller.h" @@ -268,7 +269,7 @@ typedef struct { bool enable_bdp_probe; /* bdp estimation */ - grpc_bdp_estimator bdp_estimator; + grpc_core::ManualConstructor<grpc_core::BdpEstimator> bdp_estimator; /* pid controller */ bool pid_controller_initialized; @@ -297,7 +298,7 @@ struct grpc_chttp2_transport { /** is the transport destroying itself? */ uint8_t destroying; /** has the upper layer closed the transport? */ - uint8_t closed; + grpc_error *closed_with_error; /** is there a read request to the endpoint outstanding? */ uint8_t endpoint_reading; @@ -339,7 +340,7 @@ struct grpc_chttp2_transport { /** hpack encoding */ grpc_chttp2_hpack_compressor hpack_compressor; /** is this a client? */ - uint8_t is_client; + bool is_client; /** data to write next write */ grpc_slice_buffer qbuf; @@ -349,14 +350,14 @@ struct grpc_chttp2_transport { uint32_t write_buffer_size; /** have we seen a goaway */ - uint8_t seen_goaway; + bool seen_goaway; /** have we sent a goaway */ grpc_chttp2_sent_goaway_state sent_goaway_state; /** are the local settings dirty and need to be sent? */ - uint8_t dirtied_local_settings; + bool dirtied_local_settings; /** have local settings been sent? */ - uint8_t sent_local_settings; + bool sent_local_settings; /** bitmask of setting indexes to send out */ uint32_t force_send_settings; /** settings values */ @@ -421,6 +422,7 @@ struct grpc_chttp2_transport { grpc_chttp2_write_cb *write_cb_pool; /* bdp estimator */ + grpc_closure next_bdp_ping_timer_expired_locked; grpc_closure start_bdp_ping_locked; grpc_closure finish_bdp_ping_locked; @@ -441,6 +443,10 @@ struct grpc_chttp2_transport { /** destructive cleanup closure */ grpc_closure destructive_reclaimer_locked; + /* next bdp ping timer */ + bool have_next_bdp_ping_timer; + grpc_timer next_bdp_ping_timer; + /* keep-alive ping support */ /** Closure to initialize a keepalive ping */ grpc_closure init_keepalive_ping_locked; @@ -748,7 +754,6 @@ typedef struct { grpc_chttp2_flowctl_urgency send_setting_update; uint32_t initial_window_size; uint32_t max_frame_size; - bool need_ping; } grpc_chttp2_flowctl_action; // Reads the flow control data and returns and actionable struct that will tell diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 25c1a5ef05..c6fecf2ee9 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -245,7 +245,8 @@ class WriteContext { void UpdateStreamsNoLongerStalled() { grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) { - if (!t_->closed && grpc_chttp2_list_add_writable_stream(t_, s)) { + if (t_->closed_with_error == GRPC_ERROR_NONE && + grpc_chttp2_list_add_writable_stream(t_, s)) { if (!stream_ref_if_not_destroyed(&s->refcount->refs)) { grpc_chttp2_list_remove_writable_stream(t_, s); } diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 1001d74c22..67a8358927 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -62,96 +62,22 @@ typedef struct inproc_transport { struct inproc_stream *stream_list; } inproc_transport; -typedef struct sb_list_entry { - grpc_slice_buffer sb; - struct sb_list_entry *next; -} sb_list_entry; - -// Specialize grpc_byte_stream for our use case -typedef struct { - grpc_byte_stream base; - sb_list_entry *le; - grpc_error *shutdown_error; -} inproc_slice_byte_stream; - -typedef struct { - // TODO (vjpai): Add some inlined elements to avoid alloc in simple cases - sb_list_entry *head; - sb_list_entry *tail; -} slice_buffer_list; - -static void slice_buffer_list_init(slice_buffer_list *l) { - l->head = NULL; - l->tail = NULL; -} - -static void sb_list_entry_destroy(grpc_exec_ctx *exec_ctx, sb_list_entry *le) { - grpc_slice_buffer_destroy_internal(exec_ctx, &le->sb); - gpr_free(le); -} - -static void slice_buffer_list_destroy(grpc_exec_ctx *exec_ctx, - slice_buffer_list *l) { - sb_list_entry *curr = l->head; - while (curr != NULL) { - sb_list_entry *le = curr; - curr = curr->next; - sb_list_entry_destroy(exec_ctx, le); - } - l->head = NULL; - l->tail = NULL; -} - -static bool slice_buffer_list_empty(slice_buffer_list *l) { - return l->head == NULL; -} - -static void slice_buffer_list_append_entry(slice_buffer_list *l, - sb_list_entry *next) { - next->next = NULL; - if (l->tail) { - l->tail->next = next; - l->tail = next; - } else { - l->head = next; - l->tail = next; - } -} - -static grpc_slice_buffer *slice_buffer_list_append(slice_buffer_list *l) { - sb_list_entry *next = (sb_list_entry *)gpr_malloc(sizeof(*next)); - grpc_slice_buffer_init(&next->sb); - slice_buffer_list_append_entry(l, next); - return &next->sb; -} - -static sb_list_entry *slice_buffer_list_pophead(slice_buffer_list *l) { - sb_list_entry *ret = l->head; - l->head = l->head->next; - if (l->head == NULL) { - l->tail = NULL; - } - return ret; -} - typedef struct inproc_stream { inproc_transport *t; grpc_metadata_batch to_read_initial_md; uint32_t to_read_initial_md_flags; bool to_read_initial_md_filled; - slice_buffer_list to_read_message; grpc_metadata_batch to_read_trailing_md; bool to_read_trailing_md_filled; - bool reads_needed; - bool read_closure_scheduled; - grpc_closure read_closure; + bool ops_needed; + bool op_closure_scheduled; + grpc_closure op_closure; // Write buffer used only during gap at init time when client-side // stream is set up but server side stream is not yet set up grpc_metadata_batch write_buffer_initial_md; bool write_buffer_initial_md_filled; uint32_t write_buffer_initial_md_flags; grpc_millis write_buffer_deadline; - slice_buffer_list write_buffer_message; grpc_metadata_batch write_buffer_trailing_md; bool write_buffer_trailing_md_filled; grpc_error *write_buffer_cancel_error; @@ -164,11 +90,15 @@ typedef struct inproc_stream { gpr_arena *arena; + grpc_transport_stream_op_batch *send_message_op; + grpc_transport_stream_op_batch *send_trailing_md_op; grpc_transport_stream_op_batch *recv_initial_md_op; grpc_transport_stream_op_batch *recv_message_op; grpc_transport_stream_op_batch *recv_trailing_md_op; - inproc_slice_byte_stream recv_message_stream; + grpc_slice_buffer recv_message; + grpc_slice_buffer_stream recv_stream; + bool recv_inited; bool initial_md_sent; bool trailing_md_sent; @@ -187,54 +117,11 @@ typedef struct inproc_stream { struct inproc_stream *stream_list_next; } inproc_stream; -static bool inproc_slice_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *bs, size_t max, - grpc_closure *on_complete) { - // Because inproc transport always provides the entire message atomically, - // the byte stream always has data available when this function is called. - // Thus, this function always returns true (unlike other transports) and - // there is never any need to schedule a closure - return true; -} - -static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *bs, - grpc_slice *slice) { - inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; - if (stream->shutdown_error != GRPC_ERROR_NONE) { - return GRPC_ERROR_REF(stream->shutdown_error); - } - *slice = grpc_slice_buffer_take_first(&stream->le->sb); - return GRPC_ERROR_NONE; -} - -static void inproc_slice_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *bs, - grpc_error *error) { - inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; - GRPC_ERROR_UNREF(stream->shutdown_error); - stream->shutdown_error = error; -} - -static void inproc_slice_byte_stream_destroy(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *bs) { - inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; - sb_list_entry_destroy(exec_ctx, stream->le); - GRPC_ERROR_UNREF(stream->shutdown_error); -} - -static const grpc_byte_stream_vtable inproc_slice_byte_stream_vtable = { - inproc_slice_byte_stream_next, inproc_slice_byte_stream_pull, - inproc_slice_byte_stream_shutdown, inproc_slice_byte_stream_destroy}; - -void inproc_slice_byte_stream_init(inproc_slice_byte_stream *s, - sb_list_entry *le) { - s->base.length = (uint32_t)le->sb.length; - s->base.flags = 0; - s->base.vtable = &inproc_slice_byte_stream_vtable; - s->le = le; - s->shutdown_error = GRPC_ERROR_NONE; -} +static grpc_closure do_nothing_closure; +static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, + grpc_error *error); +static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); static void ref_transport(inproc_transport *t) { INPROC_LOG(GPR_DEBUG, "ref_transport %p", t); @@ -280,12 +167,14 @@ static void unref_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s, static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) { INPROC_LOG(GPR_DEBUG, "really_destroy_stream %p", s); - slice_buffer_list_destroy(exec_ctx, &s->to_read_message); - slice_buffer_list_destroy(exec_ctx, &s->write_buffer_message); GRPC_ERROR_UNREF(s->write_buffer_cancel_error); GRPC_ERROR_UNREF(s->cancel_self_error); GRPC_ERROR_UNREF(s->cancel_other_error); + if (s->recv_inited) { + grpc_slice_buffer_destroy_internal(exec_ctx, &s->recv_message); + } + unref_transport(exec_ctx, s->t); if (s->closure_at_destroy) { @@ -293,9 +182,6 @@ static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) { } } -static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); - static void log_metadata(const grpc_metadata_batch *md_batch, bool is_client, bool is_initial) { for (grpc_linked_mdelem *md = md_batch->list.head; md != NULL; @@ -359,11 +245,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->write_buffer_initial_md_filled = false; grpc_metadata_batch_init(&s->write_buffer_trailing_md); s->write_buffer_trailing_md_filled = false; - slice_buffer_list_init(&s->to_read_message); - slice_buffer_list_init(&s->write_buffer_message); - s->reads_needed = false; - s->read_closure_scheduled = false; - GRPC_CLOSURE_INIT(&s->read_closure, read_state_machine, s, + s->ops_needed = false; + s->op_closure_scheduled = false; + GRPC_CLOSURE_INIT(&s->op_closure, op_state_machine, s, grpc_schedule_on_exec_ctx); s->t = t; s->closure_at_destroy = NULL; @@ -425,11 +309,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_initial_md); cs->write_buffer_initial_md_filled = false; } - while (!slice_buffer_list_empty(&cs->write_buffer_message)) { - slice_buffer_list_append_entry( - &s->to_read_message, - slice_buffer_list_pophead(&cs->write_buffer_message)); - } if (cs->write_buffer_trailing_md_filled) { fill_in_metadata(exec_ctx, s, &cs->write_buffer_trailing_md, 0, &s->to_read_trailing_md, NULL, @@ -488,9 +367,39 @@ static void close_other_side_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, } } +// Call the on_complete closure associated with this stream_op_batch if +// this stream_op_batch is only one of the pending operations for this +// stream. This is called when one of the pending operations for the stream +// is done and about to be NULLed out +static void complete_if_batch_end_locked(grpc_exec_ctx *exec_ctx, + inproc_stream *s, grpc_error *error, + grpc_transport_stream_op_batch *op, + const char *msg) { + int is_sm = (int)(op == s->send_message_op); + int is_stm = (int)(op == s->send_trailing_md_op); + int is_rim = (int)(op == s->recv_initial_md_op); + int is_rm = (int)(op == s->recv_message_op); + int is_rtm = (int)(op == s->recv_trailing_md_op); + + if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) { + INPROC_LOG(GPR_DEBUG, "%s %p %p %p", msg, s, op, error); + GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_REF(error)); + } +} + +static void maybe_schedule_op_closure_locked(grpc_exec_ctx *exec_ctx, + inproc_stream *s, + grpc_error *error) { + if (s && s->ops_needed && !s->op_closure_scheduled) { + GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_REF(error)); + s->op_closure_scheduled = true; + s->ops_needed = false; + } +} + static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, grpc_error *error) { - INPROC_LOG(GPR_DEBUG, "read_state_machine %p fail_helper", s); + INPROC_LOG(GPR_DEBUG, "op_state_machine %p fail_helper", s); // If we're failing this side, we need to make sure that // we also send or have already sent trailing metadata if (!s->trailing_md_sent) { @@ -512,14 +421,7 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, if (other->cancel_other_error == GRPC_ERROR_NONE) { other->cancel_other_error = GRPC_ERROR_REF(error); } - if (other->reads_needed) { - if (!other->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, - GRPC_ERROR_REF(error)); - other->read_closure_scheduled = true; - } - other->reads_needed = false; - } + maybe_schedule_op_closure_locked(exec_ctx, other, error); } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) { s->write_buffer_cancel_error = GRPC_ERROR_REF(error); } @@ -564,14 +466,9 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, err); // Last use of err so no need to REF and then UNREF it - if ((s->recv_initial_md_op != s->recv_message_op) && - (s->recv_initial_md_op != s->recv_trailing_md_op)) { - INPROC_LOG(GPR_DEBUG, - "fail_helper %p scheduling initial-metadata-on-complete %p", - error, s); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete, - GRPC_ERROR_REF(error)); - } + complete_if_batch_end_locked( + exec_ctx, s, error, s->recv_initial_md_op, + "fail_helper scheduling recv-initial-metadata-on-complete"); s->recv_initial_md_op = NULL; } if (s->recv_message_op) { @@ -580,20 +477,30 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, GRPC_CLOSURE_SCHED( exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_REF(error)); - if (s->recv_message_op != s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling message-on-complete %p", - s, error); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, - GRPC_ERROR_REF(error)); - } + complete_if_batch_end_locked( + exec_ctx, s, error, s->recv_message_op, + "fail_helper scheduling recv-message-on-complete"); s->recv_message_op = NULL; } + if (s->send_message_op) { + complete_if_batch_end_locked( + exec_ctx, s, error, s->send_message_op, + "fail_helper scheduling send-message-on-complete"); + s->send_message_op = NULL; + } + if (s->send_trailing_md_op) { + complete_if_batch_end_locked( + exec_ctx, s, error, s->send_trailing_md_op, + "fail_helper scheduling send-trailng-md-on-complete"); + s->send_trailing_md_op = NULL; + } if (s->recv_trailing_md_op) { INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling trailing-md-on-complete %p", s, error); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, - GRPC_ERROR_REF(error)); + complete_if_batch_end_locked( + exec_ctx, s, error, s->recv_trailing_md_op, + "fail_helper scheduling recv-trailing-metadata-on-complete"); s->recv_trailing_md_op = NULL; } close_other_side_locked(exec_ctx, s, "fail_helper:other_side"); @@ -602,12 +509,61 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, GRPC_ERROR_UNREF(error); } -static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void message_transfer_locked(grpc_exec_ctx *exec_ctx, + inproc_stream *sender, + inproc_stream *receiver) { + size_t remaining = + sender->send_message_op->payload->send_message.send_message->length; + if (receiver->recv_inited) { + grpc_slice_buffer_destroy_internal(exec_ctx, &receiver->recv_message); + } + grpc_slice_buffer_init(&receiver->recv_message); + receiver->recv_inited = true; + do { + grpc_slice message_slice; + grpc_closure unused; + GPR_ASSERT(grpc_byte_stream_next( + exec_ctx, sender->send_message_op->payload->send_message.send_message, + SIZE_MAX, &unused)); + grpc_error *error = grpc_byte_stream_pull( + exec_ctx, sender->send_message_op->payload->send_message.send_message, + &message_slice); + if (error != GRPC_ERROR_NONE) { + cancel_stream_locked(exec_ctx, sender, GRPC_ERROR_REF(error)); + break; + } + GPR_ASSERT(error == GRPC_ERROR_NONE); + remaining -= GRPC_SLICE_LENGTH(message_slice); + grpc_slice_buffer_add(&receiver->recv_message, message_slice); + } while (remaining > 0); + + grpc_slice_buffer_stream_init(&receiver->recv_stream, &receiver->recv_message, + 0); + *receiver->recv_message_op->payload->recv_message.recv_message = + &receiver->recv_stream.base; + INPROC_LOG(GPR_DEBUG, "message_transfer_locked %p scheduling message-ready", + receiver); + GRPC_CLOSURE_SCHED( + exec_ctx, + receiver->recv_message_op->payload->recv_message.recv_message_ready, + GRPC_ERROR_NONE); + complete_if_batch_end_locked( + exec_ctx, sender, GRPC_ERROR_NONE, sender->send_message_op, + "message_transfer scheduling sender on_complete"); + complete_if_batch_end_locked( + exec_ctx, receiver, GRPC_ERROR_NONE, receiver->recv_message_op, + "message_transfer scheduling receiver on_complete"); + + receiver->recv_message_op = NULL; + sender->send_message_op = NULL; +} + +static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { // This function gets called when we have contents in the unprocessed reads // Get what we want based on our ops wanted // Schedule our appropriate closures - // and then return to reads_needed state if still needed + // and then return to ops_needed state if still needed // Since this is a closure directly invoked by the combiner, it should not // unref the error parameter explicitly; the combiner will do that implicitly @@ -615,12 +571,14 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, bool needs_close = false; - INPROC_LOG(GPR_DEBUG, "read_state_machine %p", arg); + INPROC_LOG(GPR_DEBUG, "op_state_machine %p", arg); inproc_stream *s = (inproc_stream *)arg; gpr_mu *mu = &s->t->mu->mu; // keep aside in case s gets closed gpr_mu_lock(mu); - s->read_closure_scheduled = false; + s->op_closure_scheduled = false; // cancellation takes precedence + inproc_stream *other = s->other_side; + if (s->cancel_self_error != GRPC_ERROR_NONE) { fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_self_error)); goto done; @@ -632,89 +590,116 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, goto done; } - if (s->recv_initial_md_op) { - if (!s->to_read_initial_md_filled) { - // We entered the state machine on some other kind of read even though - // we still haven't satisfied initial md . That's an error. - new_err = - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected frame sequencing"); - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling on_complete errors for no " - "initial md %p", - s, new_err); + if (s->send_message_op && other) { + if (other->recv_message_op) { + message_transfer_locked(exec_ctx, s, other); + maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); + } else if (!s->t->is_client && + (s->trailing_md_sent || other->recv_trailing_md_op)) { + // A server send will never be matched if the client is waiting + // for trailing metadata already + complete_if_batch_end_locked( + exec_ctx, s, GRPC_ERROR_NONE, s->send_message_op, + "op_state_machine scheduling send-message-on-complete"); + s->send_message_op = NULL; + } + } + // Pause a send trailing metadata if there is still an outstanding + // send message unless we know that the send message will never get + // matched to a receive. This happens on the client if the server has + // already sent status. + if (s->send_trailing_md_op && + (!s->send_message_op || + (s->t->is_client && + (s->trailing_md_recvd || s->to_read_trailing_md_filled)))) { + grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md + : &other->to_read_trailing_md; + bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled + : &other->to_read_trailing_md_filled; + if (*destfilled || s->trailing_md_sent) { + // The buffer is already in use; that's an error! + INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s); + new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata"); fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); goto done; - } else if (s->initial_md_recvd) { + } else { + if (other && !other->closed) { + fill_in_metadata(exec_ctx, s, + s->send_trailing_md_op->payload->send_trailing_metadata + .send_trailing_metadata, + 0, dest, NULL, destfilled); + } + s->trailing_md_sent = true; + if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { + INPROC_LOG(GPR_DEBUG, + "op_state_machine %p scheduling trailing-md-on-complete", s); + GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, + GRPC_ERROR_NONE); + s->recv_trailing_md_op = NULL; + needs_close = true; + } + } + maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); + complete_if_batch_end_locked( + exec_ctx, s, GRPC_ERROR_NONE, s->send_trailing_md_op, + "op_state_machine scheduling send-trailing-metadata-on-complete"); + s->send_trailing_md_op = NULL; + } + if (s->recv_initial_md_op) { + if (s->initial_md_recvd) { new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md"); INPROC_LOG( GPR_DEBUG, - "read_state_machine %p scheduling on_complete errors for already " + "op_state_machine %p scheduling on_complete errors for already " "recvd initial md %p", s, new_err); fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); goto done; } - s->initial_md_recvd = true; - new_err = fill_in_metadata( - exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags, - s->recv_initial_md_op->payload->recv_initial_metadata - .recv_initial_metadata, - s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, NULL); - s->recv_initial_md_op->payload->recv_initial_metadata.recv_initial_metadata - ->deadline = s->deadline; - grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md); - s->to_read_initial_md_filled = false; - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling initial-metadata-ready %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, - s->recv_initial_md_op->payload->recv_initial_metadata - .recv_initial_metadata_ready, - GRPC_ERROR_REF(new_err)); - if ((s->recv_initial_md_op != s->recv_message_op) && - (s->recv_initial_md_op != s->recv_trailing_md_op)) { - INPROC_LOG( - GPR_DEBUG, - "read_state_machine %p scheduling initial-metadata-on-complete %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete, - GRPC_ERROR_REF(new_err)); - } - s->recv_initial_md_op = NULL; - - if (new_err != GRPC_ERROR_NONE) { + if (s->to_read_initial_md_filled) { + s->initial_md_recvd = true; + new_err = fill_in_metadata( + exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags, + s->recv_initial_md_op->payload->recv_initial_metadata + .recv_initial_metadata, + s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, + NULL); + s->recv_initial_md_op->payload->recv_initial_metadata + .recv_initial_metadata->deadline = s->deadline; + grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md); + s->to_read_initial_md_filled = false; INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling on_complete errors2 %p", s, + "op_state_machine %p scheduling initial-metadata-ready %p", s, new_err); - fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); - goto done; + GRPC_CLOSURE_SCHED(exec_ctx, + s->recv_initial_md_op->payload->recv_initial_metadata + .recv_initial_metadata_ready, + GRPC_ERROR_REF(new_err)); + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->recv_initial_md_op, + "op_state_machine scheduling recv-initial-metadata-on-complete"); + s->recv_initial_md_op = NULL; + + if (new_err != GRPC_ERROR_NONE) { + INPROC_LOG(GPR_DEBUG, + "op_state_machine %p scheduling on_complete errors2 %p", s, + new_err); + fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); + goto done; + } } } - if (s->to_read_initial_md_filled) { - new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected recv frame"); - fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); - goto done; - } - if (!slice_buffer_list_empty(&s->to_read_message) && s->recv_message_op) { - inproc_slice_byte_stream_init( - &s->recv_message_stream, - slice_buffer_list_pophead(&s->to_read_message)); - *s->recv_message_op->payload->recv_message.recv_message = - &s->recv_message_stream.base; - INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s); - GRPC_CLOSURE_SCHED( - exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, - GRPC_ERROR_NONE); - if (s->recv_message_op != s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling message-on-complete %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, - GRPC_ERROR_REF(new_err)); + if (s->recv_message_op) { + if (other && other->send_message_op) { + message_transfer_locked(exec_ctx, other, s); + maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); } - s->recv_message_op = NULL; + } + if (s->recv_trailing_md_op && s->t->is_client && other && + other->send_message_op) { + maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); } if (s->to_read_trailing_md_filled) { if (s->trailing_md_recvd) { @@ -722,7 +707,7 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md"); INPROC_LOG( GPR_DEBUG, - "read_state_machine %p scheduling on_complete errors for already " + "op_state_machine %p scheduling on_complete errors for already " "recvd trailing md %p", s, new_err); fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); @@ -731,21 +716,24 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, if (s->recv_message_op != NULL) { // This message needs to be wrapped up because it will never be // satisfied - INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", - s); + INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s); GRPC_CLOSURE_SCHED( exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); - if (s->recv_message_op != s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling message-on-complete %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, - GRPC_ERROR_REF(new_err)); - } + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->recv_message_op, + "op_state_machine scheduling recv-message-on-complete"); s->recv_message_op = NULL; } + if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) { + // Nothing further will try to receive from this stream, so finish off + // any outstanding send_message op + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->send_message_op, + "op_state_machine scheduling send-message-on-complete"); + s->send_message_op = NULL; + } if (s->recv_trailing_md_op != NULL) { // We wanted trailing metadata and we got it s->trailing_md_recvd = true; @@ -763,61 +751,65 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, // (If the server hasn't already sent its trailing md, it doesn't have // a final status, so don't mark this op complete) if (s->t->is_client || s->trailing_md_sent) { - INPROC_LOG( - GPR_DEBUG, - "read_state_machine %p scheduling trailing-md-on-complete %p", s, - new_err); + INPROC_LOG(GPR_DEBUG, + "op_state_machine %p scheduling trailing-md-on-complete %p", + s, new_err); GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, GRPC_ERROR_REF(new_err)); s->recv_trailing_md_op = NULL; needs_close = true; } else { INPROC_LOG(GPR_DEBUG, - "read_state_machine %p server needs to delay handling " + "op_state_machine %p server needs to delay handling " "trailing-md-on-complete %p", s, new_err); } } else { INPROC_LOG( GPR_DEBUG, - "read_state_machine %p has trailing md but not yet waiting for it", - s); + "op_state_machine %p has trailing md but not yet waiting for it", s); } } if (s->trailing_md_recvd && s->recv_message_op) { // No further message will come on this stream, so finish off the // recv_message_op - INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s); + INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s); GRPC_CLOSURE_SCHED( exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); - if (s->recv_message_op != s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling message-on-complete %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, - GRPC_ERROR_REF(new_err)); - } + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->recv_message_op, + "op_state_machine scheduling recv-message-on-complete"); s->recv_message_op = NULL; } - if (s->recv_message_op || s->recv_trailing_md_op) { + if (s->trailing_md_recvd && (s->trailing_md_sent || s->t->is_client) && + s->send_message_op) { + // Nothing further will try to receive from this stream, so finish off + // any outstanding send_message op + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->send_message_op, + "op_state_machine scheduling send-message-on-complete"); + s->send_message_op = NULL; + } + if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op || + s->recv_message_op || s->recv_trailing_md_op) { // Didn't get the item we wanted so we still need to get // rescheduled - INPROC_LOG(GPR_DEBUG, "read_state_machine %p still needs closure %p %p", s, - s->recv_message_op, s->recv_trailing_md_op); - s->reads_needed = true; + INPROC_LOG( + GPR_DEBUG, "op_state_machine %p still needs closure %p %p %p %p %p", s, + s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op, + s->recv_message_op, s->recv_trailing_md_op); + s->ops_needed = true; } done: if (needs_close) { - close_other_side_locked(exec_ctx, s, "read_state_machine"); + close_other_side_locked(exec_ctx, s, "op_state_machine"); close_stream_locked(exec_ctx, s); } gpr_mu_unlock(mu); GRPC_ERROR_UNREF(new_err); } -static grpc_closure do_nothing_closure; - static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, grpc_error *error) { bool ret = false; // was the cancel accepted @@ -826,14 +818,7 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, if (s->cancel_self_error == GRPC_ERROR_NONE) { ret = true; s->cancel_self_error = GRPC_ERROR_REF(error); - if (s->reads_needed) { - if (!s->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure, - GRPC_ERROR_REF(s->cancel_self_error)); - s->read_closure_scheduled = true; - } - s->reads_needed = false; - } + maybe_schedule_op_closure_locked(exec_ctx, s, s->cancel_self_error); // Send trailing md to the other side indicating cancellation, even if we // already have s->trailing_md_sent = true; @@ -853,14 +838,8 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, if (other->cancel_other_error == GRPC_ERROR_NONE) { other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error); } - if (other->reads_needed) { - if (!other->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, - GRPC_ERROR_REF(other->cancel_other_error)); - other->read_closure_scheduled = true; - } - other->reads_needed = false; - } + maybe_schedule_op_closure_locked(exec_ctx, other, + other->cancel_other_error); } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) { s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error); } @@ -869,11 +848,9 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, // couldn't complete that because we hadn't yet sent out trailing // md, now's the chance if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "cancel_stream %p scheduling trailing-md-on-complete %p", s, - s->cancel_self_error); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, - GRPC_ERROR_REF(s->cancel_self_error)); + complete_if_batch_end_locked( + exec_ctx, s, s->cancel_self_error, s->recv_trailing_md_op, + "cancel_stream scheduling trailing-md-on-complete"); s->recv_trailing_md_op = NULL; } } @@ -918,7 +895,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, // already self-canceled so still give it an error error = GRPC_ERROR_REF(s->cancel_self_error); } else { - INPROC_LOG(GPR_DEBUG, "perform_stream_op %p%s%s%s%s%s%s", s, + INPROC_LOG(GPR_DEBUG, "perform_stream_op %p %s%s%s%s%s%s%s", s, + s->t->is_client ? "client" : "server", op->send_initial_metadata ? " send_initial_metadata" : "", op->send_message ? " send_message" : "", op->send_trailing_metadata ? " send_trailing_metadata" : "", @@ -929,10 +907,9 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, bool needs_close = false; + inproc_stream *other = s->other_side; if (error == GRPC_ERROR_NONE && - (op->send_initial_metadata || op->send_message || - op->send_trailing_metadata)) { - inproc_stream *other = s->other_side; + (op->send_initial_metadata || op->send_trailing_metadata)) { if (s->t->is_closed) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown"); } @@ -963,72 +940,21 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->initial_md_sent = true; } } - } - if (error == GRPC_ERROR_NONE && op->send_message) { - size_t remaining = op->payload->send_message.send_message->length; - grpc_slice_buffer *dest = slice_buffer_list_append( - (other == NULL) ? &s->write_buffer_message : &other->to_read_message); - do { - grpc_slice message_slice; - grpc_closure unused; - GPR_ASSERT(grpc_byte_stream_next(exec_ctx, - op->payload->send_message.send_message, - SIZE_MAX, &unused)); - error = grpc_byte_stream_pull( - exec_ctx, op->payload->send_message.send_message, &message_slice); - if (error != GRPC_ERROR_NONE) { - cancel_stream_locked(exec_ctx, s, GRPC_ERROR_REF(error)); - break; - } - GPR_ASSERT(error == GRPC_ERROR_NONE); - remaining -= GRPC_SLICE_LENGTH(message_slice); - grpc_slice_buffer_add(dest, message_slice); - } while (remaining != 0); - grpc_byte_stream_destroy(exec_ctx, - op->payload->send_message.send_message); - } - if (error == GRPC_ERROR_NONE && op->send_trailing_metadata) { - grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md - : &other->to_read_trailing_md; - bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled - : &other->to_read_trailing_md_filled; - if (*destfilled || s->trailing_md_sent) { - // The buffer is already in use; that's an error! - INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s); - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata"); - } else { - if (!other->closed) { - fill_in_metadata( - exec_ctx, s, - op->payload->send_trailing_metadata.send_trailing_metadata, 0, - dest, NULL, destfilled); - } - s->trailing_md_sent = true; - if (!s->t->is_client && s->trailing_md_recvd && - s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "perform_stream_op %p scheduling trailing-md-on-complete", - s); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, - GRPC_ERROR_NONE); - s->recv_trailing_md_op = NULL; - needs_close = true; - } - } - } - if (other != NULL && other->reads_needed) { - if (!other->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, error); - other->read_closure_scheduled = true; - } - other->reads_needed = false; + maybe_schedule_op_closure_locked(exec_ctx, other, error); } } + if (error == GRPC_ERROR_NONE && - (op->recv_initial_metadata || op->recv_message || + (op->send_message || op->send_trailing_metadata || + op->recv_initial_metadata || op->recv_message || op->recv_trailing_metadata)) { - // If there are any reads, mark it so that the read closure will react to - // them + // Mark ops that need to be processed by the closure + if (op->send_message) { + s->send_message_op = op; + } + if (op->send_trailing_metadata) { + s->send_trailing_md_op = op; + } if (op->recv_initial_metadata) { s->recv_initial_md_op = op; } @@ -1040,25 +966,28 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, } // We want to initiate the closure if: - // 1. There is initial metadata and something ready to take that - // 2. There is a message and something ready to take it - // 3. There is trailing metadata, even if nothing specifically wants - // that because that can shut down the message as well - if ((s->to_read_initial_md_filled && op->recv_initial_metadata) || - ((!slice_buffer_list_empty(&s->to_read_message) || - s->trailing_md_recvd) && - op->recv_message) || - (s->to_read_trailing_md_filled)) { - if (!s->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure, GRPC_ERROR_NONE); - s->read_closure_scheduled = true; + // 1. We want to send a message and the other side wants to receive or end + // 2. We want to send trailing metadata and there isn't an unmatched send + // 3. We want initial metadata and the other side has sent it + // 4. We want to receive a message and there is a message ready + // 5. There is trailing metadata, even if nothing specifically wants + // that because that can shut down the receive message as well + if ((op->send_message && other && ((other->recv_message_op != NULL) || + (other->recv_trailing_md_op != NULL))) || + (op->send_trailing_metadata && !op->send_message) || + (op->recv_initial_metadata && s->to_read_initial_md_filled) || + (op->recv_message && (other && other->send_message_op != NULL)) || + (s->to_read_trailing_md_filled || s->trailing_md_recvd)) { + if (!s->op_closure_scheduled) { + GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_NONE); + s->op_closure_scheduled = true; } } else { - s->reads_needed = true; + s->ops_needed = true; } } else { if (error != GRPC_ERROR_NONE) { - // Schedule op's read closures that we didn't push to read state machine + // Schedule op's closures that we didn't push to op state machine if (op->recv_initial_metadata) { INPROC_LOG( GPR_DEBUG, diff --git a/src/core/lib/debug/stats_data.cc b/src/core/lib/debug/stats_data.cc index 5bd7884e28..5d737c56cb 100644 --- a/src/core/lib/debug/stats_data.cc +++ b/src/core/lib/debug/stats_data.cc @@ -104,6 +104,10 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "combiner_locks_scheduled_items", "combiner_locks_scheduled_final_items", "combiner_locks_offloaded", + "call_combiner_locks_initiated", + "call_combiner_locks_scheduled_items", + "call_combiner_set_notify_on_cancel", + "call_combiner_cancelled", "executor_scheduled_short_items", "executor_scheduled_long_items", "executor_scheduled_to_self", @@ -112,6 +116,9 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "executor_push_retries", "server_requested_calls", "server_slowpath_requests_queued", + "cq_ev_queue_trylock_failures", + "cq_ev_queue_trylock_successes", + "cq_ev_queue_transient_pop_failures", }; const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "Number of client side calls created by this process", @@ -210,6 +217,11 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "Number of items scheduled against combiner locks", "Number of final items scheduled against combiner locks", "Number of combiner locks offloaded to different threads", + "Number of call combiner lock entries by process (first items queued to a " + "call combiner)", + "Number of items scheduled against call combiner locks", + "Number of times a cancellation callback was set on a call combiner", + "Number of times a call combiner was cancelled", "Number of finite runtime closures scheduled against the executor (gRPC " "thread pool)", "Number of potentially infinite runtime closures scheduled against the " @@ -222,6 +234,12 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "How many calls were requested (not necessarily received) by the server", "How many times was the server slow path taken (indicates too few " "outstanding requests)", + "Number of lock (trylock) acquisition failures on completion queue event " + "queue. High value here indicates high contention on completion queues", + "Number of lock (trylock) acquisition successes on completion queue event " + "queue.", + "Number of times NULL was popped out of completion queue's event queue " + "even though the event queue was not empty", }; const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = { "call_initial_size", diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h index d8e4e7d264..031942df5c 100644 --- a/src/core/lib/debug/stats_data.h +++ b/src/core/lib/debug/stats_data.h @@ -110,6 +110,10 @@ typedef enum { GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_ITEMS, GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS, GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED, + GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_INITIATED, + GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS, + GRPC_STATS_COUNTER_CALL_COMBINER_SET_NOTIFY_ON_CANCEL, + GRPC_STATS_COUNTER_CALL_COMBINER_CANCELLED, GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_SHORT_ITEMS, GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_LONG_ITEMS, GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_TO_SELF, @@ -118,6 +122,9 @@ typedef enum { GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES, GRPC_STATS_COUNTER_SERVER_REQUESTED_CALLS, GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED, + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_FAILURES, + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_SUCCESSES, + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES, GRPC_STATS_COUNTER_COUNT } grpc_stats_counters; extern const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT]; @@ -404,6 +411,17 @@ typedef enum { #define GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), \ GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED) +#define GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_INITIATED) +#define GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS) +#define GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_SET_NOTIFY_ON_CANCEL) +#define GRPC_STATS_INC_CALL_COMBINER_CANCELLED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_CANCELLED) #define GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), \ GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_SHORT_ITEMS) @@ -425,6 +443,15 @@ typedef enum { #define GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), \ GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED) +#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_FAILURES) +#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_SUCCESSES) +#define GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES) #define GRPC_STATS_INC_CALL_INITIAL_SIZE(exec_ctx, value) \ grpc_stats_inc_call_initial_size((exec_ctx), (int)(value)) void grpc_stats_inc_call_initial_size(grpc_exec_ctx *exec_ctx, int x); diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml index 5c0ab2262e..af4553028e 100644 --- a/src/core/lib/debug/stats_data.yaml +++ b/src/core/lib/debug/stats_data.yaml @@ -245,6 +245,16 @@ doc: Number of final items scheduled against combiner locks - counter: combiner_locks_offloaded doc: Number of combiner locks offloaded to different threads +# call combiner locks +- counter: call_combiner_locks_initiated + doc: Number of call combiner lock entries by process + (first items queued to a call combiner) +- counter: call_combiner_locks_scheduled_items + doc: Number of items scheduled against call combiner locks +- counter: call_combiner_set_notify_on_cancel + doc: Number of times a cancellation callback was set on a call combiner +- counter: call_combiner_cancelled + doc: Number of times a call combiner was cancelled # executor - counter: executor_scheduled_short_items doc: Number of finite runtime closures scheduled against the executor @@ -272,4 +282,13 @@ - counter: server_slowpath_requests_queued doc: How many times was the server slow path taken (indicates too few outstanding requests) - +# cq +- counter: cq_ev_queue_trylock_failures + doc: Number of lock (trylock) acquisition failures on completion queue event + queue. High value here indicates high contention on completion queues +- counter: cq_ev_queue_trylock_successes + doc: Number of lock (trylock) acquisition successes on completion queue event + queue. +- counter: cq_ev_queue_transient_pop_failures + doc: Number of times NULL was popped out of completion queue's event queue + even though the event queue was not empty diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql index 54869977b0..04b6d471f6 100644 --- a/src/core/lib/debug/stats_data_bq_schema.sql +++ b/src/core/lib/debug/stats_data_bq_schema.sql @@ -79,6 +79,10 @@ combiner_locks_initiated_per_iteration:FLOAT, combiner_locks_scheduled_items_per_iteration:FLOAT, combiner_locks_scheduled_final_items_per_iteration:FLOAT, combiner_locks_offloaded_per_iteration:FLOAT, +call_combiner_locks_initiated_per_iteration:FLOAT, +call_combiner_locks_scheduled_items_per_iteration:FLOAT, +call_combiner_set_notify_on_cancel_per_iteration:FLOAT, +call_combiner_cancelled_per_iteration:FLOAT, executor_scheduled_short_items_per_iteration:FLOAT, executor_scheduled_long_items_per_iteration:FLOAT, executor_scheduled_to_self_per_iteration:FLOAT, @@ -86,4 +90,7 @@ executor_wakeup_initiated_per_iteration:FLOAT, executor_queue_drained_per_iteration:FLOAT, executor_push_retries_per_iteration:FLOAT, server_requested_calls_per_iteration:FLOAT, -server_slowpath_requests_queued_per_iteration:FLOAT +server_slowpath_requests_queued_per_iteration:FLOAT, +cq_ev_queue_trylock_failures_per_iteration:FLOAT, +cq_ev_queue_trylock_successes_per_iteration:FLOAT, +cq_ev_queue_transient_pop_failures_per_iteration:FLOAT diff --git a/src/core/lib/http/httpcli_security_connector.cc b/src/core/lib/http/httpcli_security_connector.cc index ef6c4a509b..d832dacb69 100644 --- a/src/core/lib/http/httpcli_security_connector.cc +++ b/src/core/lib/http/httpcli_security_connector.cc @@ -91,8 +91,17 @@ static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx, tsi_peer_destruct(&peer); } +static int httpcli_ssl_cmp(grpc_security_connector *sc1, + grpc_security_connector *sc2) { + grpc_httpcli_ssl_channel_security_connector *c1 = + (grpc_httpcli_ssl_channel_security_connector *)sc1; + grpc_httpcli_ssl_channel_security_connector *c2 = + (grpc_httpcli_ssl_channel_security_connector *)sc2; + return strcmp(c1->secure_peer_name, c2->secure_peer_name); +} + static grpc_security_connector_vtable httpcli_ssl_vtable = { - httpcli_ssl_destroy, httpcli_ssl_check_peer}; + httpcli_ssl_destroy, httpcli_ssl_check_peer, httpcli_ssl_cmp}; static grpc_security_status httpcli_ssl_channel_security_connector_create( grpc_exec_ctx *exec_ctx, const char *pem_root_certs, @@ -123,6 +132,10 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create( *sc = NULL; return GRPC_SECURITY_ERROR; } + // We don't actually need a channel credentials object in this case, + // but we set it to a non-NULL address so that we don't trigger + // assertions in grpc_channel_security_connector_cmp(). + c->base.channel_creds = (grpc_channel_credentials *)1; c->base.add_handshakers = httpcli_ssl_add_handshakers; *sc = &c->base; return GRPC_SECURITY_OK; diff --git a/src/core/lib/iomgr/call_combiner.cc b/src/core/lib/iomgr/call_combiner.cc index bab3df021a..d45719608b 100644 --- a/src/core/lib/iomgr/call_combiner.cc +++ b/src/core/lib/iomgr/call_combiner.cc @@ -21,6 +21,8 @@ #include <inttypes.h> #include <grpc/support/log.h> +#include "src/core/lib/debug/stats.h" +#include "src/core/lib/profiling/timers.h" grpc_tracer_flag grpc_call_combiner_trace = GRPC_TRACER_INITIALIZER(false, "call_combiner"); @@ -60,6 +62,7 @@ void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, grpc_closure* closure, grpc_error* error DEBUG_ARGS, const char* reason) { + GPR_TIMER_BEGIN("call_combiner_start", 0); if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { gpr_log(GPR_DEBUG, "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR @@ -73,7 +76,10 @@ void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size, prev_size + 1); } + GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx); if (prev_size == 0) { + GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED(exec_ctx); + GPR_TIMER_MARK("call_combiner_initiate", 0); if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY"); } @@ -87,11 +93,13 @@ void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, closure->error_data.error = error; gpr_mpscq_push(&call_combiner->queue, (gpr_mpscq_node*)closure); } + GPR_TIMER_END("call_combiner_start", 0); } void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx, grpc_call_combiner* call_combiner DEBUG_ARGS, const char* reason) { + GPR_TIMER_BEGIN("call_combiner_stop", 0); if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { gpr_log(GPR_DEBUG, "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]", @@ -130,11 +138,13 @@ void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx, } else if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { gpr_log(GPR_DEBUG, " queue empty"); } + GPR_TIMER_END("call_combiner_stop", 0); } void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx, grpc_call_combiner* call_combiner, grpc_closure* closure) { + GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL(exec_ctx); while (true) { // Decode original state. gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state); @@ -179,6 +189,7 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx, void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx, grpc_call_combiner* call_combiner, grpc_error* error) { + GRPC_STATS_INC_CALL_COMBINER_CANCELLED(exec_ctx); while (true) { gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state); grpc_error* original_error = decode_cancel_state_error(original_state); diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 0e707ef839..53f4b7eaa7 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -165,6 +165,7 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl, lock, cl, last)); if (last == 1) { GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(exec_ctx); + GPR_TIMER_MARK("combiner.initiated", 0); gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, (gpr_atm)exec_ctx); // first element on this list: add it to the list of combiner locks diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index 42033d0ba4..1cc6d98491 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -109,6 +109,16 @@ #define GRPC_POSIX_SOCKETUTILS 1 #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 +#elif defined(GPR_OPENBSD) +#define GRPC_HAVE_IFADDRS 1 +#define GRPC_HAVE_IPV6_RECVPKTINFO 1 +#define GRPC_HAVE_UNIX_SOCKET 1 +#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 +#define GRPC_POSIX_SOCKET 1 +#define GRPC_POSIX_SOCKETADDR 1 +#define GRPC_POSIX_SOCKETUTILS 1 +#define GRPC_POSIX_WAKEUP_FD 1 +#define GRPC_TIMER_USE_GENERIC 1 #elif defined(GPR_NACL) #define GRPC_HAVE_ARPA_NAMESER 1 #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 diff --git a/src/core/lib/profiling/basic_timers.cc b/src/core/lib/profiling/basic_timers.cc index ab9d60481c..0ae7d7f600 100644 --- a/src/core/lib/profiling/basic_timers.cc +++ b/src/core/lib/profiling/basic_timers.cc @@ -209,9 +209,9 @@ static void init_output() { static void rotate_log() { /* Using malloc here, as this code could end up being called by gpr_malloc */ - gpr_timer_log *new = malloc(sizeof(*new)); + gpr_timer_log *log = static_cast<gpr_timer_log *>(malloc(sizeof(*log))); gpr_once_init(&g_once_init, init_output); - new->num_entries = 0; + log->num_entries = 0; pthread_mutex_lock(&g_mu); if (g_thread_log != NULL) { timer_log_remove(&g_in_progress_logs, g_thread_log); @@ -221,9 +221,9 @@ static void rotate_log() { } else { g_thread_id = g_next_thread_id++; } - timer_log_push_back(&g_in_progress_logs, new); + timer_log_push_back(&g_in_progress_logs, log); pthread_mutex_unlock(&g_mu); - g_thread_log = new; + g_thread_log = log; } static void gpr_timers_log_add(const char *tagstr, marker_type type, diff --git a/src/core/lib/security/credentials/fake/fake_credentials.cc b/src/core/lib/security/credentials/fake/fake_credentials.cc index 795ca0660a..cf10bf24c8 100644 --- a/src/core/lib/security/credentials/fake/fake_credentials.cc +++ b/src/core/lib/security/credentials/fake/fake_credentials.cc @@ -38,7 +38,8 @@ static grpc_security_status fake_transport_security_create_security_connector( grpc_call_credentials *call_creds, const char *target, const grpc_channel_args *args, grpc_channel_security_connector **sc, grpc_channel_args **new_args) { - *sc = grpc_fake_channel_security_connector_create(call_creds, target, args); + *sc = + grpc_fake_channel_security_connector_create(c, call_creds, target, args); return GRPC_SECURITY_OK; } @@ -46,7 +47,7 @@ static grpc_security_status fake_transport_security_server_create_security_connector( grpc_exec_ctx *exec_ctx, grpc_server_credentials *c, grpc_server_security_connector **sc) { - *sc = grpc_fake_server_security_connector_create(); + *sc = grpc_fake_server_security_connector_create(c); return GRPC_SECURITY_OK; } diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.cc b/src/core/lib/security/credentials/ssl/ssl_credentials.cc index 9df69a2a3d..290336adc0 100644 --- a/src/core/lib/security/credentials/ssl/ssl_credentials.cc +++ b/src/core/lib/security/credentials/ssl/ssl_credentials.cc @@ -62,7 +62,8 @@ static grpc_security_status ssl_create_security_connector( } } status = grpc_ssl_channel_security_connector_create( - exec_ctx, call_creds, &c->config, target, overridden_target_name, sc); + exec_ctx, creds, call_creds, &c->config, target, overridden_target_name, + sc); if (status != GRPC_SECURITY_OK) { return status; } @@ -128,7 +129,8 @@ static grpc_security_status ssl_server_create_security_connector( grpc_exec_ctx *exec_ctx, grpc_server_credentials *creds, grpc_server_security_connector **sc) { grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)creds; - return grpc_ssl_server_security_connector_create(exec_ctx, &c->config, sc); + return grpc_ssl_server_security_connector_create(exec_ctx, creds, &c->config, + sc); } static grpc_server_credentials_vtable ssl_server_vtable = { diff --git a/src/core/lib/security/transport/security_connector.cc b/src/core/lib/security/transport/security_connector.cc index 51844fb91f..80d9a7b77f 100644 --- a/src/core/lib/security/transport/security_connector.cc +++ b/src/core/lib/security/transport/security_connector.cc @@ -136,6 +136,39 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx, } } +int grpc_security_connector_cmp(grpc_security_connector *sc, + grpc_security_connector *other) { + if (sc == NULL || other == NULL) return GPR_ICMP(sc, other); + int c = GPR_ICMP(sc->vtable, other->vtable); + if (c != 0) return c; + return sc->vtable->cmp(sc, other); +} + +int grpc_channel_security_connector_cmp(grpc_channel_security_connector *sc1, + grpc_channel_security_connector *sc2) { + GPR_ASSERT(sc1->channel_creds != NULL); + GPR_ASSERT(sc2->channel_creds != NULL); + int c = GPR_ICMP(sc1->channel_creds, sc2->channel_creds); + if (c != 0) return c; + c = GPR_ICMP(sc1->request_metadata_creds, sc2->request_metadata_creds); + if (c != 0) return c; + c = GPR_ICMP((void *)sc1->check_call_host, (void *)sc2->check_call_host); + if (c != 0) return c; + c = GPR_ICMP((void *)sc1->cancel_check_call_host, + (void *)sc2->cancel_check_call_host); + if (c != 0) return c; + return GPR_ICMP((void *)sc1->add_handshakers, (void *)sc2->add_handshakers); +} + +int grpc_server_security_connector_cmp(grpc_server_security_connector *sc1, + grpc_server_security_connector *sc2) { + GPR_ASSERT(sc1->server_creds != NULL); + GPR_ASSERT(sc2->server_creds != NULL); + int c = GPR_ICMP(sc1->server_creds, sc2->server_creds); + if (c != 0) return c; + return GPR_ICMP((void *)sc1->add_handshakers, (void *)sc2->add_handshakers); +} + bool grpc_channel_security_connector_check_call_host( grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, const char *host, grpc_auth_context *auth_context, @@ -199,25 +232,27 @@ void grpc_security_connector_unref(grpc_exec_ctx *exec_ctx, if (gpr_unref(&sc->refcount)) sc->vtable->destroy(exec_ctx, sc); } -static void connector_pointer_arg_destroy(grpc_exec_ctx *exec_ctx, void *p) { +static void connector_arg_destroy(grpc_exec_ctx *exec_ctx, void *p) { GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, (grpc_security_connector *)p, - "connector_pointer_arg_destroy"); + "connector_arg_destroy"); } -static void *connector_pointer_arg_copy(void *p) { +static void *connector_arg_copy(void *p) { return GRPC_SECURITY_CONNECTOR_REF((grpc_security_connector *)p, - "connector_pointer_arg_copy"); + "connector_arg_copy"); } -static int connector_pointer_cmp(void *a, void *b) { return GPR_ICMP(a, b); } +static int connector_cmp(void *a, void *b) { + return grpc_security_connector_cmp((grpc_security_connector *)a, + (grpc_security_connector *)b); +} -static const grpc_arg_pointer_vtable connector_pointer_vtable = { - connector_pointer_arg_copy, connector_pointer_arg_destroy, - connector_pointer_cmp}; +static const grpc_arg_pointer_vtable connector_arg_vtable = { + connector_arg_copy, connector_arg_destroy, connector_cmp}; grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc) { return grpc_channel_arg_pointer_create((char *)GRPC_ARG_SECURITY_CONNECTOR, - sc, &connector_pointer_vtable); + sc, &connector_arg_vtable); } grpc_security_connector *grpc_security_connector_from_arg(const grpc_arg *arg) { @@ -382,6 +417,32 @@ static void fake_server_check_peer(grpc_exec_ctx *exec_ctx, fake_check_peer(exec_ctx, sc, peer, auth_context, on_peer_checked); } +static int fake_channel_cmp(grpc_security_connector *sc1, + grpc_security_connector *sc2) { + grpc_fake_channel_security_connector *c1 = + (grpc_fake_channel_security_connector *)sc1; + grpc_fake_channel_security_connector *c2 = + (grpc_fake_channel_security_connector *)sc2; + int c = grpc_channel_security_connector_cmp(&c1->base, &c2->base); + if (c != 0) return c; + c = strcmp(c1->target, c2->target); + if (c != 0) return c; + if (c1->expected_targets == NULL || c2->expected_targets == NULL) { + c = GPR_ICMP(c1->expected_targets, c2->expected_targets); + } else { + c = strcmp(c1->expected_targets, c2->expected_targets); + } + if (c != 0) return c; + return GPR_ICMP(c1->is_lb_channel, c2->is_lb_channel); +} + +static int fake_server_cmp(grpc_security_connector *sc1, + grpc_security_connector *sc2) { + return grpc_server_security_connector_cmp( + (grpc_server_security_connector *)sc1, + (grpc_server_security_connector *)sc2); +} + static bool fake_channel_check_call_host(grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, const char *host, @@ -418,12 +479,13 @@ static void fake_server_add_handshakers(grpc_exec_ctx *exec_ctx, } static grpc_security_connector_vtable fake_channel_vtable = { - fake_channel_destroy, fake_channel_check_peer}; + fake_channel_destroy, fake_channel_check_peer, fake_channel_cmp}; static grpc_security_connector_vtable fake_server_vtable = { - fake_server_destroy, fake_server_check_peer}; + fake_server_destroy, fake_server_check_peer, fake_server_cmp}; grpc_channel_security_connector *grpc_fake_channel_security_connector_create( + grpc_channel_credentials *channel_creds, grpc_call_credentials *request_metadata_creds, const char *target, const grpc_channel_args *args) { grpc_fake_channel_security_connector *c = @@ -431,6 +493,7 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create( gpr_ref_init(&c->base.base.refcount, 1); c->base.base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME; c->base.base.vtable = &fake_channel_vtable; + c->base.channel_creds = channel_creds; c->base.request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds); c->base.check_call_host = fake_channel_check_call_host; @@ -444,13 +507,14 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create( } grpc_server_security_connector *grpc_fake_server_security_connector_create( - void) { + grpc_server_credentials *server_creds) { grpc_server_security_connector *c = (grpc_server_security_connector *)gpr_zalloc( sizeof(grpc_server_security_connector)); gpr_ref_init(&c->base.refcount, 1); c->base.vtable = &fake_server_vtable; c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME; + c->server_creds = server_creds; c->add_handshakers = fake_server_add_handshakers; return c; } @@ -473,6 +537,7 @@ static void ssl_channel_destroy(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc) { grpc_ssl_channel_security_connector *c = (grpc_ssl_channel_security_connector *)sc; + grpc_channel_credentials_unref(exec_ctx, c->base.channel_creds); grpc_call_credentials_unref(exec_ctx, c->base.request_metadata_creds); tsi_ssl_client_handshaker_factory_unref(c->client_handshaker_factory); c->client_handshaker_factory = NULL; @@ -485,6 +550,7 @@ static void ssl_server_destroy(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc) { grpc_ssl_server_security_connector *c = (grpc_ssl_server_security_connector *)sc; + grpc_server_credentials_unref(exec_ctx, c->base.server_creds); tsi_ssl_server_handshaker_factory_unref(c->server_handshaker_factory); c->server_handshaker_factory = NULL; gpr_free(sc); @@ -641,6 +707,29 @@ static void ssl_server_check_peer(grpc_exec_ctx *exec_ctx, GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, error); } +static int ssl_channel_cmp(grpc_security_connector *sc1, + grpc_security_connector *sc2) { + grpc_ssl_channel_security_connector *c1 = + (grpc_ssl_channel_security_connector *)sc1; + grpc_ssl_channel_security_connector *c2 = + (grpc_ssl_channel_security_connector *)sc2; + int c = grpc_channel_security_connector_cmp(&c1->base, &c2->base); + if (c != 0) return c; + c = strcmp(c1->target_name, c2->target_name); + if (c != 0) return c; + return (c1->overridden_target_name == NULL || + c2->overridden_target_name == NULL) + ? GPR_ICMP(c1->overridden_target_name, c2->overridden_target_name) + : strcmp(c1->overridden_target_name, c2->overridden_target_name); +} + +static int ssl_server_cmp(grpc_security_connector *sc1, + grpc_security_connector *sc2) { + return grpc_server_security_connector_cmp( + (grpc_server_security_connector *)sc1, + (grpc_server_security_connector *)sc2); +} + static void add_shallow_auth_property_to_peer(tsi_peer *peer, const grpc_auth_property *prop, const char *tsi_prop_name) { @@ -717,10 +806,10 @@ static void ssl_channel_cancel_check_call_host( } static grpc_security_connector_vtable ssl_channel_vtable = { - ssl_channel_destroy, ssl_channel_check_peer}; + ssl_channel_destroy, ssl_channel_check_peer, ssl_channel_cmp}; static grpc_security_connector_vtable ssl_server_vtable = { - ssl_server_destroy, ssl_server_check_peer}; + ssl_server_destroy, ssl_server_check_peer, ssl_server_cmp}; /* returns a NULL terminated slice. */ static grpc_slice compute_default_pem_root_certs_once(void) { @@ -804,7 +893,8 @@ const char *grpc_get_default_ssl_roots(void) { } grpc_security_status grpc_ssl_channel_security_connector_create( - grpc_exec_ctx *exec_ctx, grpc_call_credentials *request_metadata_creds, + grpc_exec_ctx *exec_ctx, grpc_channel_credentials *channel_creds, + grpc_call_credentials *request_metadata_creds, const grpc_ssl_config *config, const char *target_name, const char *overridden_target_name, grpc_channel_security_connector **sc) { size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions(); @@ -840,6 +930,7 @@ grpc_security_status grpc_ssl_channel_security_connector_create( gpr_ref_init(&c->base.base.refcount, 1); c->base.base.vtable = &ssl_channel_vtable; c->base.base.url_scheme = GRPC_SSL_URL_SCHEME; + c->base.channel_creds = grpc_channel_credentials_ref(channel_creds); c->base.request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds); c->base.check_call_host = ssl_channel_check_call_host; @@ -874,8 +965,8 @@ error: } grpc_security_status grpc_ssl_server_security_connector_create( - grpc_exec_ctx *exec_ctx, const grpc_ssl_server_config *config, - grpc_server_security_connector **sc) { + grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_creds, + const grpc_ssl_server_config *config, grpc_server_security_connector **sc) { size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions(); const char **alpn_protocol_strings = (const char **)gpr_malloc(sizeof(const char *) * num_alpn_protocols); @@ -897,6 +988,7 @@ grpc_security_status grpc_ssl_server_security_connector_create( gpr_ref_init(&c->base.base.refcount, 1); c->base.base.url_scheme = GRPC_SSL_URL_SCHEME; c->base.base.vtable = &ssl_server_vtable; + c->base.server_creds = grpc_server_credentials_ref(server_creds); result = tsi_create_ssl_server_handshaker_factory_ex( config->pem_key_cert_pairs, config->num_key_cert_pairs, config->pem_root_certs, get_tsi_client_certificate_request_type( diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h index 4d87cd0c80..216bb35e81 100644 --- a/src/core/lib/security/transport/security_connector.h +++ b/src/core/lib/security/transport/security_connector.h @@ -60,13 +60,9 @@ typedef struct { void (*check_peer)(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc, tsi_peer peer, grpc_auth_context **auth_context, grpc_closure *on_peer_checked); + int (*cmp)(grpc_security_connector *sc, grpc_security_connector *other); } grpc_security_connector_vtable; -typedef struct grpc_security_connector_handshake_list { - void *handshake; - struct grpc_security_connector_handshake_list *next; -} grpc_security_connector_handshake_list; - struct grpc_security_connector { const grpc_security_connector_vtable *vtable; gpr_refcount refcount; @@ -104,6 +100,10 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx, grpc_auth_context **auth_context, grpc_closure *on_peer_checked); +/* Compares two security connectors. */ +int grpc_security_connector_cmp(grpc_security_connector *sc, + grpc_security_connector *other); + /* Util to encapsulate the connector in a channel arg. */ grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc); @@ -116,13 +116,14 @@ grpc_security_connector *grpc_security_connector_find_in_args( /* --- channel_security_connector object. --- - A channel security connector object represents away to configure the + A channel security connector object represents a way to configure the underlying transport security mechanism on the client side. */ typedef struct grpc_channel_security_connector grpc_channel_security_connector; struct grpc_channel_security_connector { grpc_security_connector base; + grpc_channel_credentials *channel_creds; grpc_call_credentials *request_metadata_creds; bool (*check_call_host)(grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, const char *host, @@ -138,6 +139,10 @@ struct grpc_channel_security_connector { grpc_handshake_manager *handshake_mgr); }; +/// A helper function for use in grpc_security_connector_cmp() implementations. +int grpc_channel_security_connector_cmp(grpc_channel_security_connector *sc1, + grpc_channel_security_connector *sc2); + /// Checks that the host that will be set for a call is acceptable. /// Returns true if completed synchronously, in which case \a error will /// be set to indicate the result. Otherwise, \a on_call_host_checked @@ -161,18 +166,23 @@ void grpc_channel_security_connector_add_handshakers( /* --- server_security_connector object. --- - A server security connector object represents away to configure the + A server security connector object represents a way to configure the underlying transport security mechanism on the server side. */ typedef struct grpc_server_security_connector grpc_server_security_connector; struct grpc_server_security_connector { grpc_security_connector base; + grpc_server_credentials *server_creds; void (*add_handshakers)(grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, grpc_handshake_manager *handshake_mgr); }; +/// A helper function for use in grpc_security_connector_cmp() implementations. +int grpc_server_security_connector_cmp(grpc_server_security_connector *sc1, + grpc_server_security_connector *sc2); + void grpc_server_security_connector_add_handshakers( grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, grpc_handshake_manager *handshake_mgr); @@ -182,13 +192,14 @@ void grpc_server_security_connector_add_handshakers( /* For TESTING ONLY! Creates a fake connector that emulates real channel security. */ grpc_channel_security_connector *grpc_fake_channel_security_connector_create( + grpc_channel_credentials *channel_creds, grpc_call_credentials *request_metadata_creds, const char *target, const grpc_channel_args *args); /* For TESTING ONLY! Creates a fake connector that emulates real server security. */ grpc_server_security_connector *grpc_fake_server_security_connector_create( - void); + grpc_server_credentials *server_creds); /* Config for ssl clients. */ @@ -211,7 +222,8 @@ typedef struct { specific error code otherwise. */ grpc_security_status grpc_ssl_channel_security_connector_create( - grpc_exec_ctx *exec_ctx, grpc_call_credentials *request_metadata_creds, + grpc_exec_ctx *exec_ctx, grpc_channel_credentials *channel_creds, + grpc_call_credentials *request_metadata_creds, const grpc_ssl_config *config, const char *target_name, const char *overridden_target_name, grpc_channel_security_connector **sc); @@ -236,8 +248,8 @@ typedef struct { specific error code otherwise. */ grpc_security_status grpc_ssl_server_security_connector_create( - grpc_exec_ctx *exec_ctx, const grpc_ssl_server_config *config, - grpc_server_security_connector **sc); + grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_creds, + const grpc_ssl_server_config *config, grpc_server_security_connector **sc); /* Util. */ const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer, diff --git a/src/core/lib/support/manual_constructor.h b/src/core/lib/support/manual_constructor.h new file mode 100644 index 0000000000..d753cf98a0 --- /dev/null +++ b/src/core/lib/support/manual_constructor.h @@ -0,0 +1,76 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_MANUAL_CONSTRUCTOR_H +#define GRPC_CORE_LIB_SUPPORT_MANUAL_CONSTRUCTOR_H + +// manually construct a region of memory with some type + +#include <stddef.h> +#include <new> +#include <type_traits> +#include <utility> + +namespace grpc_core { + +template <typename Type> +class ManualConstructor { + public: + // No constructor or destructor because one of the most useful uses of + // this class is as part of a union, and members of a union could not have + // constructors or destructors till C++11. And, anyway, the whole point of + // this class is to bypass constructor and destructor. + + Type* get() { return reinterpret_cast<Type*>(&space_); } + const Type* get() const { return reinterpret_cast<const Type*>(&space_); } + + Type* operator->() { return get(); } + const Type* operator->() const { return get(); } + + Type& operator*() { return *get(); } + const Type& operator*() const { return *get(); } + + void Init() { new (&space_) Type; } + + // Init() constructs the Type instance using the given arguments + // (which are forwarded to Type's constructor). + // + // Note that Init() with no arguments performs default-initialization, + // not zero-initialization (i.e it behaves the same as "new Type;", not + // "new Type();"), so it will leave non-class types uninitialized. + template <typename... Ts> + void Init(Ts&&... args) { + new (&space_) Type(std::forward<Ts>(args)...); + } + + // Init() that is equivalent to copy and move construction. + // Enables usage like this: + // ManualConstructor<std::vector<int>> v; + // v.Init({1, 2, 3}); + void Init(const Type& x) { new (&space_) Type(x); } + void Init(Type&& x) { new (&space_) Type(std::move(x)); } + + void Destroy() { get()->~Type(); } + + private: + typename std::aligned_storage<sizeof(Type), alignof(Type)>::type space_; +}; + +} // namespace grpc_core + +#endif diff --git a/src/core/lib/support/time_posix.cc b/src/core/lib/support/time_posix.cc index 3267ea6b54..3f8a9094fd 100644 --- a/src/core/lib/support/time_posix.cc +++ b/src/core/lib/support/time_posix.cc @@ -42,7 +42,7 @@ static struct timespec timespec_from_gpr(gpr_timespec gts) { return rv; } -#if _POSIX_TIMERS > 0 +#if _POSIX_TIMERS > 0 || defined(__OpenBSD__) static gpr_timespec gpr_from_timespec(struct timespec ts, gpr_clock_type clock_type) { /* diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 36b4b835f8..21664f03c8 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -362,11 +362,24 @@ static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) { grpc_cq_completion *c = NULL; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + if (gpr_spinlock_trylock(&q->queue_lock)) { - c = (grpc_cq_completion *)gpr_mpscq_pop(&q->queue); + GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(&exec_ctx); + + bool is_empty = false; + c = (grpc_cq_completion *)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty); gpr_spinlock_unlock(&q->queue_lock); + + if (c == NULL && !is_empty) { + GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(&exec_ctx); + } + } else { + GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(&exec_ctx); } + grpc_exec_ctx_finish(&exec_ctx); + if (c) { gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1); } diff --git a/src/core/lib/transport/bdp_estimator.cc b/src/core/lib/transport/bdp_estimator.cc index 6ed427ce5c..f1597014b1 100644 --- a/src/core/lib/transport/bdp_estimator.cc +++ b/src/core/lib/transport/bdp_estimator.cc @@ -21,117 +21,64 @@ #include <inttypes.h> #include <stdlib.h> -#include <grpc/support/log.h> #include <grpc/support/useful.h> grpc_tracer_flag grpc_bdp_estimator_trace = GRPC_TRACER_INITIALIZER(false, "bdp_estimator"); -void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name) { - estimator->estimate = 65536; - estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED; - estimator->ping_start_time = gpr_time_0(GPR_CLOCK_MONOTONIC); - estimator->next_ping_scheduled = 0; - estimator->name = name; - estimator->bw_est = 0; - estimator->inter_ping_delay = 100.0; // start at 100ms - estimator->stable_estimate_count = 0; -} - -bool grpc_bdp_estimator_get_estimate(const grpc_bdp_estimator *estimator, - int64_t *estimate) { - *estimate = estimator->estimate; - return true; -} - -bool grpc_bdp_estimator_get_bw(const grpc_bdp_estimator *estimator, - double *bw) { - *bw = estimator->bw_est; - return true; -} - -void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator, - int64_t num_bytes) { - estimator->accumulator += num_bytes; -} - -bool grpc_bdp_estimator_need_ping(grpc_exec_ctx *exec_ctx, - const grpc_bdp_estimator *estimator) { - switch (estimator->ping_state) { - case GRPC_BDP_PING_UNSCHEDULED: - return grpc_exec_ctx_now(exec_ctx) >= estimator->next_ping_scheduled; - case GRPC_BDP_PING_SCHEDULED: - return false; - case GRPC_BDP_PING_STARTED: - return false; - } - GPR_UNREACHABLE_CODE(return false); -} +namespace grpc_core { -void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator) { - if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { - gpr_log(GPR_DEBUG, "bdp[%s]:sched acc=%" PRId64 " est=%" PRId64, - estimator->name, estimator->accumulator, estimator->estimate); - } - GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_UNSCHEDULED); - estimator->ping_state = GRPC_BDP_PING_SCHEDULED; - estimator->accumulator = 0; -} +BdpEstimator::BdpEstimator(const char *name) + : ping_state_(PingState::UNSCHEDULED), + accumulator_(0), + estimate_(65536), + ping_start_time_(gpr_time_0(GPR_CLOCK_MONOTONIC)), + inter_ping_delay_(100.0), // start at 100ms + stable_estimate_count_(0), + bw_est_(0), + name_(name) {} -void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator) { - if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { - gpr_log(GPR_DEBUG, "bdp[%s]:start acc=%" PRId64 " est=%" PRId64, - estimator->name, estimator->accumulator, estimator->estimate); - } - GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_SCHEDULED); - estimator->ping_state = GRPC_BDP_PING_STARTED; - estimator->accumulator = 0; - estimator->ping_start_time = gpr_now(GPR_CLOCK_MONOTONIC); -} - -void grpc_bdp_estimator_complete_ping(grpc_exec_ctx *exec_ctx, - grpc_bdp_estimator *estimator) { +grpc_millis BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec dt_ts = gpr_time_sub(now, estimator->ping_start_time); + gpr_timespec dt_ts = gpr_time_sub(now, ping_start_time_); double dt = (double)dt_ts.tv_sec + 1e-9 * (double)dt_ts.tv_nsec; - double bw = dt > 0 ? ((double)estimator->accumulator / dt) : 0; - int start_inter_ping_delay = estimator->inter_ping_delay; + double bw = dt > 0 ? ((double)accumulator_ / dt) : 0; + int start_inter_ping_delay = inter_ping_delay_; if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { gpr_log(GPR_DEBUG, "bdp[%s]:complete acc=%" PRId64 " est=%" PRId64 " dt=%lf bw=%lfMbs bw_est=%lfMbs", - estimator->name, estimator->accumulator, estimator->estimate, dt, - bw / 125000.0, estimator->bw_est / 125000.0); + name_, accumulator_, estimate_, dt, bw / 125000.0, + bw_est_ / 125000.0); } - GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_STARTED); - if (estimator->accumulator > 2 * estimator->estimate / 3 && - bw > estimator->bw_est) { - estimator->estimate = - GPR_MAX(estimator->accumulator, estimator->estimate * 2); - estimator->bw_est = bw; + GPR_ASSERT(ping_state_ == PingState::STARTED); + if (accumulator_ > 2 * estimate_ / 3 && bw > bw_est_) { + estimate_ = GPR_MAX(accumulator_, estimate_ * 2); + bw_est_ = bw; if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { - gpr_log(GPR_DEBUG, "bdp[%s]: estimate increased to %" PRId64, - estimator->name, estimator->estimate); + gpr_log(GPR_DEBUG, "bdp[%s]: estimate increased to %" PRId64, name_, + estimate_); } - estimator->inter_ping_delay /= 2; // if the ping estimate changes, - // exponentially get faster at probing - } else if (estimator->inter_ping_delay < 10000) { - estimator->stable_estimate_count++; - if (estimator->stable_estimate_count >= 2) { - estimator->inter_ping_delay += + inter_ping_delay_ /= 2; // if the ping estimate changes, + // exponentially get faster at probing + } else if (inter_ping_delay_ < 10000) { + stable_estimate_count_++; + if (stable_estimate_count_ >= 2) { + inter_ping_delay_ += 100 + (int)(rand() * 100.0 / RAND_MAX); // if the ping estimate is steady, // slowly ramp down the probe time } } - if (start_inter_ping_delay != estimator->inter_ping_delay) { - estimator->stable_estimate_count = 0; + if (start_inter_ping_delay != inter_ping_delay_) { + stable_estimate_count_ = 0; if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { - gpr_log(GPR_DEBUG, "bdp[%s]:update_inter_time to %dms", estimator->name, - estimator->inter_ping_delay); + gpr_log(GPR_DEBUG, "bdp[%s]:update_inter_time to %dms", name_, + inter_ping_delay_); } } - estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED; - estimator->accumulator = 0; - estimator->next_ping_scheduled = - grpc_exec_ctx_now(exec_ctx) + estimator->inter_ping_delay; + ping_state_ = PingState::UNSCHEDULED; + accumulator_ = 0; + return grpc_exec_ctx_now(exec_ctx) + inter_ping_delay_; } + +} // namespace grpc_core diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h index a9d986782c..470c127f7f 100644 --- a/src/core/lib/transport/bdp_estimator.h +++ b/src/core/lib/transport/bdp_estimator.h @@ -19,67 +19,83 @@ #ifndef GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H #define GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H -#include <grpc/support/time.h> +#include <grpc/support/port_platform.h> + +#include <inttypes.h> #include <stdbool.h> #include <stdint.h> + +#include <grpc/support/log.h> +#include <grpc/support/time.h> + #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/exec_ctx.h" -#define GRPC_BDP_SAMPLES 16 -#define GRPC_BDP_MIN_SAMPLES_FOR_ESTIMATE 3 +extern grpc_tracer_flag grpc_bdp_estimator_trace; -#ifdef __cplusplus -extern "C" { -#endif +namespace grpc_core { -extern grpc_tracer_flag grpc_bdp_estimator_trace; +class BdpEstimator { + public: + explicit BdpEstimator(const char *name); + ~BdpEstimator() {} -typedef enum { - GRPC_BDP_PING_UNSCHEDULED, - GRPC_BDP_PING_SCHEDULED, - GRPC_BDP_PING_STARTED -} grpc_bdp_estimator_ping_state; + // Returns true if a reasonable estimate could be obtained + bool EstimateBdp(int64_t *estimate_out) const { + *estimate_out = estimate_; + return true; + } + bool EstimateBandwidth(double *bw_out) const { + *bw_out = bw_est_; + return true; + } -typedef struct grpc_bdp_estimator { - grpc_bdp_estimator_ping_state ping_state; - int64_t accumulator; - int64_t estimate; + void AddIncomingBytes(int64_t num_bytes) { accumulator_ += num_bytes; } + + // Schedule a ping: call in response to receiving a true from + // grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a + // transport (but not necessarily started) + void SchedulePing() { + if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { + gpr_log(GPR_DEBUG, "bdp[%s]:sched acc=%" PRId64 " est=%" PRId64, name_, + accumulator_, estimate_); + } + GPR_ASSERT(ping_state_ == PingState::UNSCHEDULED); + ping_state_ = PingState::SCHEDULED; + accumulator_ = 0; + } + + // Start a ping: call after calling grpc_bdp_estimator_schedule_ping and + // once + // the ping is on the wire + void StartPing() { + if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { + gpr_log(GPR_DEBUG, "bdp[%s]:start acc=%" PRId64 " est=%" PRId64, name_, + accumulator_, estimate_); + } + GPR_ASSERT(ping_state_ == PingState::SCHEDULED); + ping_state_ = PingState::STARTED; + accumulator_ = 0; + ping_start_time_ = gpr_now(GPR_CLOCK_MONOTONIC); + } + + // Completes a previously started ping, returns when to schedule the next one + grpc_millis CompletePing(grpc_exec_ctx *exec_ctx); + + private: + enum class PingState { UNSCHEDULED, SCHEDULED, STARTED }; + + PingState ping_state_; + int64_t accumulator_; + int64_t estimate_; // when was the current ping started? - gpr_timespec ping_start_time; - // when should the next ping start? - grpc_millis next_ping_scheduled; - int inter_ping_delay; - int stable_estimate_count; - double bw_est; - const char *name; -} grpc_bdp_estimator; - -void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name); - -// Returns true if a reasonable estimate could be obtained -bool grpc_bdp_estimator_get_estimate(const grpc_bdp_estimator *estimator, - int64_t *estimate); -// Tracks new bytes read. -bool grpc_bdp_estimator_get_bw(const grpc_bdp_estimator *estimator, double *bw); -// Returns true if the user should schedule a ping -void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator, - int64_t num_bytes); -// Returns true if the user should schedule a ping -bool grpc_bdp_estimator_need_ping(grpc_exec_ctx *exec_ctx, - const grpc_bdp_estimator *estimator); -// Schedule a ping: call in response to receiving a true from -// grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a -// transport (but not necessarily started) -void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator); -// Start a ping: call after calling grpc_bdp_estimator_schedule_ping and once -// the ping is on the wire -void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator); -// Completes a previously started ping -void grpc_bdp_estimator_complete_ping(grpc_exec_ctx *exec_ctx, - grpc_bdp_estimator *estimator); - -#ifdef __cplusplus -} -#endif + gpr_timespec ping_start_time_; + int inter_ping_delay_; + int stable_estimate_count_; + double bw_est_; + const char *name_; +}; + +} // namespace grpc_core #endif /* GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H */ diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 6bd3ecda32..d982a3d2b7 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -266,8 +266,11 @@ class Server::SyncRequestThreadManager : public ThreadManager { WorkStatus PollForWork(void** tag, bool* ok) override { *tag = nullptr; + // TODO(ctiller): workaround for GPR_TIMESPAN based deadlines not working + // right now gpr_timespec deadline = - gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN); + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN)); switch (server_cq_->AsyncNext(tag, ok, deadline)) { case CompletionQueue::TIMEOUT: diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs index 087b685963..f59989655e 100644 --- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs @@ -36,7 +36,21 @@ namespace Grpc.Core readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - internal AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + /// <summary> + /// Creates a new AsyncClientStreamingCall object with the specified properties. + /// </summary> + /// <param name="requestStream">Stream of request values.</param> + /// <param name="responseAsync">The response of the asynchronous call.</param> + /// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param> + /// <param name="getStatusFunc">Delegate returning the status of the call.</param> + /// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param> + /// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param> + public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, + Task<TResponse> responseAsync, + Task<Metadata> responseHeadersAsync, + Func<Status> getStatusFunc, + Func<Metadata> getTrailersFunc, + Action disposeAction) { this.requestStream = requestStream; this.responseAsync = responseAsync; diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs index ce49fb1596..1cb1a91859 100644 --- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs @@ -35,7 +35,21 @@ namespace Grpc.Core readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - internal AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + /// <summary> + /// Creates a new AsyncDuplexStreamingCall object with the specified properties. + /// </summary> + /// <param name="requestStream">Stream of request values.</param> + /// <param name="responseStream">Stream of response values.</param> + /// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param> + /// <param name="getStatusFunc">Delegate returning the status of the call.</param> + /// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param> + /// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param> + public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, + IAsyncStreamReader<TResponse> responseStream, + Task<Metadata> responseHeadersAsync, + Func<Status> getStatusFunc, + Func<Metadata> getTrailersFunc, + Action disposeAction) { this.requestStream = requestStream; this.responseStream = responseStream; diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs index fbc97b8148..4303b0b1b0 100644 --- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs @@ -33,7 +33,19 @@ namespace Grpc.Core readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - internal AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + /// <summary> + /// Creates a new AsyncDuplexStreamingCall object with the specified properties. + /// </summary> + /// <param name="responseStream">Stream of response values.</param> + /// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param> + /// <param name="getStatusFunc">Delegate returning the status of the call.</param> + /// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param> + /// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param> + public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, + Task<Metadata> responseHeadersAsync, + Func<Status> getStatusFunc, + Func<Metadata> getTrailersFunc, + Action disposeAction) { this.responseStream = responseStream; this.responseHeadersAsync = responseHeadersAsync; diff --git a/src/csharp/Grpc.Core/AsyncUnaryCall.cs b/src/csharp/Grpc.Core/AsyncUnaryCall.cs index 6348f3c5fd..17747f86ca 100644 --- a/src/csharp/Grpc.Core/AsyncUnaryCall.cs +++ b/src/csharp/Grpc.Core/AsyncUnaryCall.cs @@ -34,7 +34,20 @@ namespace Grpc.Core readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - internal AsyncUnaryCall(Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + + /// <summary> + /// Creates a new AsyncUnaryCall object with the specified properties. + /// </summary> + /// <param name="responseAsync">The response of the asynchronous call.</param> + /// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param> + /// <param name="getStatusFunc">Delegate returning the status of the call.</param> + /// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param> + /// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param> + public AsyncUnaryCall(Task<TResponse> responseAsync, + Task<Metadata> responseHeadersAsync, + Func<Status> getStatusFunc, + Func<Metadata> getTrailersFunc, + Action disposeAction) { this.responseAsync = responseAsync; this.responseHeadersAsync = responseHeadersAsync; diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m index 82ac2600fa..5672bdad4c 100644 --- a/src/objective-c/tests/GRPCClientTests.m +++ b/src/objective-c/tests/GRPCClientTests.m @@ -18,6 +18,7 @@ #import <UIKit/UIKit.h> #import <XCTest/XCTest.h> +#import <grpc/grpc.h> #import <GRPCClient/GRPCCall.h> #import <GRPCClient/GRPCCall+ChannelArg.h> @@ -30,6 +31,8 @@ #import <RxLibrary/GRXWriter+Immediate.h> #import <RxLibrary/GRXBufferedPipe.h> +#import "version.h" + #define TEST_TIMEOUT 16 static NSString * const kHostAddress = @"localhost:5050"; @@ -266,12 +269,38 @@ static GRPCProtoMethod *kFullDuplexCallMethod; id<GRXWriteable> responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) { XCTAssertNotNil(value, @"nil value received as response."); XCTAssertEqual([value length], 0, @"Non-empty response received: %@", value); - /* This test needs to be more clever in regards to changing the version of the core. - XCTAssertEqualObjects(call.responseHeaders[@"x-grpc-test-echo-useragent"], - @"Foo grpc-objc/0.13.0 grpc-c/0.14.0-dev (ios)", - @"Did not receive expected user agent %@", - call.responseHeaders[@"x-grpc-test-echo-useragent"]); - */ + + NSString *userAgent = call.responseHeaders[@"x-grpc-test-echo-useragent"]; + NSError *error = nil; + + // Test the regex is correct + NSString *expectedUserAgent = @"Foo grpc-objc/"; + expectedUserAgent = + [expectedUserAgent stringByAppendingString:GRPC_OBJC_VERSION_STRING]; + expectedUserAgent = + [expectedUserAgent stringByAppendingString:@" grpc-c/"]; + expectedUserAgent = + [expectedUserAgent stringByAppendingString:GRPC_C_VERSION_STRING]; + expectedUserAgent = + [expectedUserAgent stringByAppendingString:@" (ios; chttp2; "]; + expectedUserAgent = + [expectedUserAgent stringByAppendingString:[NSString stringWithUTF8String:grpc_g_stands_for()]]; + expectedUserAgent = [expectedUserAgent stringByAppendingString:@")"]; + XCTAssertEqualObjects(userAgent, expectedUserAgent); + + // Change in format of user-agent field in a direction that does not match the regex will likely + // cause problem for certain gRPC users. For details, refer to internal doc https://goo.gl/c2diBc + NSRegularExpression *regex = + [NSRegularExpression regularExpressionWithPattern:@" grpc-[a-zA-Z0-9]+(-[a-zA-Z0-9]+)?/[^ ,]+( \\([^)]*\\))?" + options:0 + error:&error]; + NSString *customUserAgent = + [regex stringByReplacingMatchesInString:userAgent + options:0 + range:NSMakeRange(0, [userAgent length]) + withTemplate:@""]; + XCTAssertEqualObjects(customUserAgent, @"Foo"); + [response fulfill]; } completionHandler:^(NSError *errorOrNil) { XCTAssertNil(errorOrNil, @"Finished with unexpected error: %@", errorOrNil); diff --git a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj index b01d5ffcea..2f0c8cfe18 100644 --- a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj +++ b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj @@ -144,6 +144,7 @@ 5EAD6D241E27047400002378 /* CronetUnitTests.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = CronetUnitTests.xctest; sourceTree = BUILT_PRODUCTS_DIR; }; 5EAD6D261E27047400002378 /* CronetUnitTests.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = CronetUnitTests.m; sourceTree = "<group>"; }; 5EAD6D281E27047400002378 /* Info.plist */ = {isa = PBXFileReference; lastKnownFileType = text.plist.xml; path = Info.plist; sourceTree = "<group>"; }; + 5EAFE8271F8EFB87007F2189 /* version.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = version.h; sourceTree = "<group>"; }; 5EE84BF11D4717E40050C6CC /* InteropTestsRemoteWithCronet.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = InteropTestsRemoteWithCronet.xctest; sourceTree = BUILT_PRODUCTS_DIR; }; 5EE84BF31D4717E40050C6CC /* InteropTestsRemoteWithCronet.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = InteropTestsRemoteWithCronet.m; sourceTree = "<group>"; }; 5EE84BF51D4717E40050C6CC /* Info.plist */ = {isa = PBXFileReference; lastKnownFileType = text.plist.xml; path = Info.plist; sourceTree = "<group>"; }; @@ -398,6 +399,7 @@ 635697C91B14FC11007A7283 /* Tests */ = { isa = PBXGroup; children = ( + 5EAFE8271F8EFB87007F2189 /* version.h */, 6312AE4D1B1BF49B00341DEE /* GRPCClientTests.m */, 63E240CC1B6C4D3A005F3B0E /* InteropTests.h */, 635ED2EB1B1A3BC400FDE5C3 /* InteropTests.m */, @@ -740,9 +742,12 @@ files = ( ); inputPaths = ( + "${SRCROOT}/Pods/Target Support Files/Pods-CronetUnitTests/Pods-CronetUnitTests-resources.sh", + $PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle, ); name = "[CP] Copy Pods Resources"; outputPaths = ( + "${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; @@ -755,9 +760,12 @@ files = ( ); inputPaths = ( + "${SRCROOT}/Pods/Target Support Files/Pods-InteropTestsRemoteWithCronet/Pods-InteropTestsRemoteWithCronet-frameworks.sh", + "${PODS_ROOT}/CronetFramework/Cronet.framework", ); name = "[CP] Embed Pods Frameworks"; outputPaths = ( + "${TARGET_BUILD_DIR}/${FRAMEWORKS_FOLDER_PATH}/Cronet.framework", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; @@ -770,13 +778,16 @@ files = ( ); inputPaths = ( + "${PODS_PODFILE_DIR_PATH}/Podfile.lock", + "${PODS_ROOT}/Manifest.lock", ); name = "[CP] Check Pods Manifest.lock"; outputPaths = ( + "$(DERIVED_FILE_DIR)/Pods-InteropTestsRemote-checkManifestLockResult.txt", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; - shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n"; + shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n"; showEnvVarsInLog = 0; }; 4F5690DC0E6AD6663FE78B8B /* [CP] Embed Pods Frameworks */ = { @@ -800,13 +811,16 @@ files = ( ); inputPaths = ( + "${PODS_PODFILE_DIR_PATH}/Podfile.lock", + "${PODS_ROOT}/Manifest.lock", ); name = "[CP] Check Pods Manifest.lock"; outputPaths = ( + "$(DERIVED_FILE_DIR)/Pods-InteropTestsLocalSSL-checkManifestLockResult.txt", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; - shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n"; + shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n"; showEnvVarsInLog = 0; }; 5F14F59509E10C2852014F9E /* [CP] Embed Pods Frameworks */ = { @@ -830,9 +844,12 @@ files = ( ); inputPaths = ( + "${SRCROOT}/Pods/Target Support Files/Pods-InteropTestsLocalSSL/Pods-InteropTestsLocalSSL-resources.sh", + $PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle, ); name = "[CP] Copy Pods Resources"; outputPaths = ( + "${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; @@ -845,9 +862,12 @@ files = ( ); inputPaths = ( + "${SRCROOT}/Pods/Target Support Files/Pods-CoreCronetEnd2EndTests/Pods-CoreCronetEnd2EndTests-resources.sh", + $PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle, ); name = "[CP] Copy Pods Resources"; outputPaths = ( + "${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; @@ -860,13 +880,16 @@ files = ( ); inputPaths = ( + "${PODS_PODFILE_DIR_PATH}/Podfile.lock", + "${PODS_ROOT}/Manifest.lock", ); name = "[CP] Check Pods Manifest.lock"; outputPaths = ( + "$(DERIVED_FILE_DIR)/Pods-InteropTestsLocalCleartext-checkManifestLockResult.txt", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; - shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n"; + shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n"; showEnvVarsInLog = 0; }; 796680C7599CB4ED736DD62A /* [CP] Check Pods Manifest.lock */ = { @@ -875,13 +898,16 @@ files = ( ); inputPaths = ( + "${PODS_PODFILE_DIR_PATH}/Podfile.lock", + "${PODS_ROOT}/Manifest.lock", ); name = "[CP] Check Pods Manifest.lock"; outputPaths = ( + "$(DERIVED_FILE_DIR)/Pods-Tests-checkManifestLockResult.txt", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; - shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n"; + shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n"; showEnvVarsInLog = 0; }; 80E2DDD2EC04A4009F45E933 /* [CP] Check Pods Manifest.lock */ = { @@ -890,13 +916,16 @@ files = ( ); inputPaths = ( + "${PODS_PODFILE_DIR_PATH}/Podfile.lock", + "${PODS_ROOT}/Manifest.lock", ); name = "[CP] Check Pods Manifest.lock"; outputPaths = ( + "$(DERIVED_FILE_DIR)/Pods-CronetUnitTests-checkManifestLockResult.txt", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; - shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n"; + shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n"; showEnvVarsInLog = 0; }; 8AD3130D3C58A0FB32FF2A36 /* [CP] Copy Pods Resources */ = { @@ -905,9 +934,12 @@ files = ( ); inputPaths = ( + "${SRCROOT}/Pods/Target Support Files/Pods-InteropTestsLocalCleartext/Pods-InteropTestsLocalCleartext-resources.sh", + $PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle, ); name = "[CP] Copy Pods Resources"; outputPaths = ( + "${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; @@ -935,13 +967,16 @@ files = ( ); inputPaths = ( + "${PODS_PODFILE_DIR_PATH}/Podfile.lock", + "${PODS_ROOT}/Manifest.lock", ); name = "[CP] Check Pods Manifest.lock"; outputPaths = ( + "$(DERIVED_FILE_DIR)/Pods-AllTests-checkManifestLockResult.txt", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; - shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n"; + shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n"; showEnvVarsInLog = 0; }; A441F71824DCB9D0CA297748 /* [CP] Copy Pods Resources */ = { @@ -950,9 +985,12 @@ files = ( ); inputPaths = ( + "${SRCROOT}/Pods/Target Support Files/Pods-AllTests/Pods-AllTests-resources.sh", + $PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle, ); name = "[CP] Copy Pods Resources"; outputPaths = ( + "${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; @@ -965,9 +1003,12 @@ files = ( ); inputPaths = ( + "${SRCROOT}/Pods/Target Support Files/Pods-CronetUnitTests/Pods-CronetUnitTests-frameworks.sh", + "${PODS_ROOT}/CronetFramework/Cronet.framework", ); name = "[CP] Embed Pods Frameworks"; outputPaths = ( + "${TARGET_BUILD_DIR}/${FRAMEWORKS_FOLDER_PATH}/Cronet.framework", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; @@ -995,9 +1036,12 @@ files = ( ); inputPaths = ( + "${SRCROOT}/Pods/Target Support Files/Pods-Tests/Pods-Tests-resources.sh", + $PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle, ); name = "[CP] Copy Pods Resources"; outputPaths = ( + "${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; @@ -1010,13 +1054,16 @@ files = ( ); inputPaths = ( + "${PODS_PODFILE_DIR_PATH}/Podfile.lock", + "${PODS_ROOT}/Manifest.lock", ); name = "[CP] Check Pods Manifest.lock"; outputPaths = ( + "$(DERIVED_FILE_DIR)/Pods-RxLibraryUnitTests-checkManifestLockResult.txt", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; - shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n"; + shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n"; showEnvVarsInLog = 0; }; C0F7B1FF6F88CC5FBF362F4C /* [CP] Check Pods Manifest.lock */ = { @@ -1025,13 +1072,16 @@ files = ( ); inputPaths = ( + "${PODS_PODFILE_DIR_PATH}/Podfile.lock", + "${PODS_ROOT}/Manifest.lock", ); name = "[CP] Check Pods Manifest.lock"; outputPaths = ( + "$(DERIVED_FILE_DIR)/Pods-InteropTestsRemoteWithCronet-checkManifestLockResult.txt", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; - shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n"; + shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n"; showEnvVarsInLog = 0; }; C2E09DC4BD239F71160F0CC1 /* [CP] Copy Pods Resources */ = { @@ -1040,9 +1090,12 @@ files = ( ); inputPaths = ( + "${SRCROOT}/Pods/Target Support Files/Pods-InteropTestsRemote/Pods-InteropTestsRemote-resources.sh", + $PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle, ); name = "[CP] Copy Pods Resources"; outputPaths = ( + "${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; @@ -1070,9 +1123,12 @@ files = ( ); inputPaths = ( + "${SRCROOT}/Pods/Target Support Files/Pods-RxLibraryUnitTests/Pods-RxLibraryUnitTests-resources.sh", + $PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle, ); name = "[CP] Copy Pods Resources"; outputPaths = ( + "${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; @@ -1085,9 +1141,12 @@ files = ( ); inputPaths = ( + "${SRCROOT}/Pods/Target Support Files/Pods-InteropTestsRemoteWithCronet/Pods-InteropTestsRemoteWithCronet-resources.sh", + $PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle, ); name = "[CP] Copy Pods Resources"; outputPaths = ( + "${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; @@ -1100,9 +1159,12 @@ files = ( ); inputPaths = ( + "${SRCROOT}/Pods/Target Support Files/Pods-CoreCronetEnd2EndTests/Pods-CoreCronetEnd2EndTests-frameworks.sh", + "${PODS_ROOT}/CronetFramework/Cronet.framework", ); name = "[CP] Embed Pods Frameworks"; outputPaths = ( + "${TARGET_BUILD_DIR}/${FRAMEWORKS_FOLDER_PATH}/Cronet.framework", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; @@ -1115,13 +1177,16 @@ files = ( ); inputPaths = ( + "${PODS_PODFILE_DIR_PATH}/Podfile.lock", + "${PODS_ROOT}/Manifest.lock", ); name = "[CP] Check Pods Manifest.lock"; outputPaths = ( + "$(DERIVED_FILE_DIR)/Pods-CoreCronetEnd2EndTests-checkManifestLockResult.txt", ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; - shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n"; + shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n"; showEnvVarsInLog = 0; }; /* End PBXShellScriptBuildPhase section */ diff --git a/src/objective-c/tests/version.h b/src/objective-c/tests/version.h new file mode 100644 index 0000000000..02515063fa --- /dev/null +++ b/src/objective-c/tests/version.h @@ -0,0 +1,27 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// This file is autogenerated from a template file. Please make +// modifications to +// `templates/src/objective-c/GRPCClient/private/version.h.template` +// instead. This file can be regenerated from the template by running +// `tools/buildgen/generate_projects.sh`. + + +#define GRPC_OBJC_VERSION_STRING @"1.8.0-dev" +#define GRPC_C_VERSION_STRING @"5.0.0-dev" diff --git a/src/ruby/ext/grpc/extconf.rb b/src/ruby/ext/grpc/extconf.rb index e5ab02b9fc..9d2cf2a08a 100644 --- a/src/ruby/ext/grpc/extconf.rb +++ b/src/ruby/ext/grpc/extconf.rb @@ -41,6 +41,7 @@ LIB_DIRS = [ ] windows = RUBY_PLATFORM =~ /mingw|mswin/ +bsd = RUBY_PLATFORM =~ /bsd/ grpc_root = File.expand_path(File.join(File.dirname(__FILE__), '../../../..')) @@ -70,7 +71,8 @@ unless windows puts 'Building internal gRPC into ' + grpc_lib_dir nproc = 4 nproc = Etc.nprocessors * 2 if Etc.respond_to? :nprocessors - system("make -j#{nproc} -C #{grpc_root} #{grpc_lib_dir}/libgrpc.a CONFIG=#{grpc_config} Q=") + make = bsd ? 'gmake' : 'make' + system("#{make} -j#{nproc} -C #{grpc_root} #{grpc_lib_dir}/libgrpc.a CONFIG=#{grpc_config} Q=") exit 1 unless $? == 0 end |