aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/c-ares/gen_build_yaml.py12
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc119
-rw-r--r--src/core/ext/transport/chttp2/transport/flow_control.cc7
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h19
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.cc3
-rw-r--r--src/core/ext/transport/inproc/inproc_transport.cc669
-rw-r--r--src/core/lib/debug/stats_data.cc18
-rw-r--r--src/core/lib/debug/stats_data.h27
-rw-r--r--src/core/lib/debug/stats_data.yaml21
-rw-r--r--src/core/lib/debug/stats_data_bq_schema.sql9
-rw-r--r--src/core/lib/http/httpcli_security_connector.cc15
-rw-r--r--src/core/lib/iomgr/call_combiner.cc11
-rw-r--r--src/core/lib/iomgr/combiner.cc1
-rw-r--r--src/core/lib/iomgr/port.h10
-rw-r--r--src/core/lib/profiling/basic_timers.cc8
-rw-r--r--src/core/lib/security/credentials/fake/fake_credentials.cc5
-rw-r--r--src/core/lib/security/credentials/ssl/ssl_credentials.cc6
-rw-r--r--src/core/lib/security/transport/security_connector.cc126
-rw-r--r--src/core/lib/security/transport/security_connector.h34
-rw-r--r--src/core/lib/support/manual_constructor.h76
-rw-r--r--src/core/lib/support/time_posix.cc2
-rw-r--r--src/core/lib/surface/completion_queue.cc15
-rw-r--r--src/core/lib/transport/bdp_estimator.cc127
-rw-r--r--src/core/lib/transport/bdp_estimator.h120
-rw-r--r--src/cpp/server/server_cc.cc5
-rw-r--r--src/csharp/Grpc.Core/AsyncClientStreamingCall.cs16
-rw-r--r--src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs16
-rw-r--r--src/csharp/Grpc.Core/AsyncServerStreamingCall.cs14
-rw-r--r--src/csharp/Grpc.Core/AsyncUnaryCall.cs15
-rw-r--r--src/objective-c/tests/GRPCClientTests.m41
-rw-r--r--src/objective-c/tests/Tests.xcodeproj/project.pbxproj83
-rw-r--r--src/objective-c/tests/version.h27
-rw-r--r--src/ruby/ext/grpc/extconf.rb4
33 files changed, 1060 insertions, 621 deletions
diff --git a/src/c-ares/gen_build_yaml.py b/src/c-ares/gen_build_yaml.py
index 6f8b7485ac..4600d8d224 100755
--- a/src/c-ares/gen_build_yaml.py
+++ b/src/c-ares/gen_build_yaml.py
@@ -29,10 +29,14 @@ try:
subprocess.call("third_party/cares/cares/configure", shell=True)
def config_platform(x):
- if 'linux' in sys.platform:
- return 'src/cares/cares/config_linux/ares_config.h'
if 'darwin' in sys.platform:
return 'src/cares/cares/config_darwin/ares_config.h'
+ if 'freebsd' in sys.platform:
+ return 'src/cares/cares/config_freebsd/ares_config.h'
+ if 'linux' in sys.platform:
+ return 'src/cares/cares/config_linux/ares_config.h'
+ if 'openbsd' in sys.platform:
+ return 'src/cares/cares/config_openbsd/ares_config.h'
if not os.path.isfile('third_party/cares/cares/ares_config.h'):
gen_ares_build(x)
return 'third_party/cares/cares/ares_config.h'
@@ -124,8 +128,10 @@ try:
"third_party/cares/cares/config-win32.h",
"third_party/cares/cares/setup_once.h",
"third_party/cares/ares_build.h",
+ "third_party/cares/config_darwin/ares_config.h",
+ "third_party/cares/config_freebsd/ares_config.h",
"third_party/cares/config_linux/ares_config.h",
- "third_party/cares/config_darwin/ares_config.h"
+ "third_party/cares/config_openbsd/ares_config.h"
],
}]
except:
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index e4b19a2c4a..9462d1085e 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -152,10 +152,14 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error);
+static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t);
static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error);
static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error);
+static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx,
+ void *tp, grpc_error *error);
static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error);
@@ -218,6 +222,9 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
t->write_cb_pool = next;
}
+ t->flow_control.bdp_estimator.Destroy();
+
+ GRPC_ERROR_UNREF(t->closed_with_error);
gpr_free(t->ping_acks);
gpr_free(t->peer_string);
gpr_free(t);
@@ -303,6 +310,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
+ next_bdp_ping_timer_expired_locked, t,
+ grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked,
t, grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
@@ -315,7 +325,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
keepalive_watchdog_fired_locked, t,
grpc_combiner_scheduler(t->combiner));
- grpc_bdp_estimator_init(&t->flow_control.bdp_estimator, t->peer_string);
+ t->flow_control.bdp_estimator.Init(t->peer_string);
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser);
@@ -562,6 +572,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
}
+ if (t->flow_control.enable_bdp_probe) {
+ GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
+ schedule_bdp_ping_locked(exec_ctx, t);
+ }
+
grpc_chttp2_act_on_flowctl_action(
exec_ctx,
grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t,
@@ -595,7 +610,9 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_error *error) {
- if (!t->closed) {
+ end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
+ cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error));
+ if (t->closed_with_error == GRPC_ERROR_NONE) {
if (!grpc_error_has_clear_grpc_status(error)) {
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE);
@@ -610,13 +627,16 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_error_add_child(t->close_transport_on_writes_finished, error);
return;
}
- t->closed = 1;
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
+ t->closed_with_error = GRPC_ERROR_REF(error);
connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
- grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
if (t->ping_state.is_delayed_ping_timer_set) {
grpc_timer_cancel(exec_ctx, &t->ping_state.delayed_ping_timer);
}
+ if (t->have_next_bdp_ping_timer) {
+ grpc_timer_cancel(exec_ctx, &t->next_bdp_ping_timer);
+ }
switch (t->keepalive_state) {
case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
@@ -636,8 +656,8 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close");
}
- end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
- cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error));
+ GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
+ grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
}
@@ -940,7 +960,8 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
- if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) {
+ if (t->closed_with_error == GRPC_ERROR_NONE &&
+ grpc_chttp2_list_add_writable_stream(t, s)) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
}
}
@@ -993,7 +1014,7 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
grpc_chttp2_begin_write_result r;
- if (t->closed) {
+ if (t->closed_with_error != GRPC_ERROR_NONE) {
r.writing = false;
} else {
r = grpc_chttp2_begin_write(exec_ctx, t);
@@ -1456,7 +1477,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
if (!s->write_closed) {
if (t->is_client) {
- if (!t->closed) {
+ if (t->closed_with_error == GRPC_ERROR_NONE) {
GPR_ASSERT(s->id == 0);
grpc_chttp2_list_add_waiting_for_concurrency(t, s);
maybe_start_some_streams(exec_ctx, t);
@@ -1464,7 +1485,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_chttp2_cancel_stream(
exec_ctx, t, s,
grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"),
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Transport closed", &t->closed_with_error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
}
} else {
@@ -1686,6 +1708,7 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
/* callback remaining pings: they're not allowed to call into the transpot,
and maybe they hold resources that need to be freed */
grpc_chttp2_ping_queue *pq = &t->ping_queue;
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[j]);
@@ -1695,6 +1718,12 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure *on_initiate, grpc_closure *on_ack) {
+ if (t->closed_with_error != GRPC_ERROR_NONE) {
+ GRPC_CLOSURE_SCHED(exec_ctx, on_initiate,
+ GRPC_ERROR_REF(t->closed_with_error));
+ GRPC_CLOSURE_SCHED(exec_ctx, on_ack, GRPC_ERROR_REF(t->closed_with_error));
+ return;
+ }
grpc_chttp2_ping_queue *pq = &t->ping_queue;
grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
GRPC_ERROR_NONE);
@@ -1753,7 +1782,9 @@ void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
/*The transport will be closed after the write is done */
close_transport_locked(
- exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"));
+ exec_ctx, t, grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
}
}
@@ -2432,12 +2463,6 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS);
}
}
- if (action.need_ping) {
- GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
- grpc_bdp_estimator_schedule_ping(&t->flow_control.bdp_estimator);
- send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked,
- &t->finish_bdp_ping_locked);
- }
}
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
@@ -2487,14 +2512,13 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
}
GPR_SWAP(grpc_error *, err, error);
GRPC_ERROR_UNREF(err);
- if (!t->closed) {
+ if (t->closed_with_error == GRPC_ERROR_NONE) {
GPR_TIMER_BEGIN("reading_action.parse", 0);
size_t i = 0;
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
GRPC_ERROR_NONE};
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
- grpc_bdp_estimator_add_incoming_bytes(
- &t->flow_control.bdp_estimator,
+ t->flow_control.bdp_estimator->AddIncomingBytes(
(int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
errors[1] =
grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]);
@@ -2528,13 +2552,14 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_BEGIN("post_reading_action_locked", 0);
bool keep_reading = false;
- if (error == GRPC_ERROR_NONE && t->closed) {
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed");
+ if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Transport closed", &t->closed_with_error, 1);
}
if (error != GRPC_ERROR_NONE) {
close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
t->endpoint_reading = 0;
- } else if (!t->closed) {
+ } else if (t->closed_with_error == GRPC_ERROR_NONE) {
keep_reading = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading");
}
@@ -2559,28 +2584,57 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END("reading_action_locked", 0);
}
+// t is reffed prior to calling the first time, and once the callback chain
+// that kicks off finishes, it's unreffed
+static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ t->flow_control.bdp_estimator->SchedulePing();
+ send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked,
+ &t->finish_bdp_ping_locked);
+}
+
static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp;
if (GRPC_TRACER_ON(grpc_http_trace)) {
- gpr_log(GPR_DEBUG, "%s: Start BDP ping", t->peer_string);
+ gpr_log(GPR_DEBUG, "%s: Start BDP ping err=%s", t->peer_string,
+ grpc_error_string(error));
}
/* Reset the keepalive ping timer */
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
}
- grpc_bdp_estimator_start_ping(&t->flow_control.bdp_estimator);
+ t->flow_control.bdp_estimator->StartPing();
}
static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp;
if (GRPC_TRACER_ON(grpc_http_trace)) {
- gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string);
+ gpr_log(GPR_DEBUG, "%s: Complete BDP ping err=%s", t->peer_string,
+ grpc_error_string(error));
+ }
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
+ return;
}
- grpc_bdp_estimator_complete_ping(exec_ctx, &t->flow_control.bdp_estimator);
+ grpc_millis next_ping = t->flow_control.bdp_estimator->CompletePing(exec_ctx);
+ GPR_ASSERT(!t->have_next_bdp_ping_timer);
+ t->have_next_bdp_ping_timer = true;
+ grpc_timer_init(exec_ctx, &t->next_bdp_ping_timer, next_ping,
+ &t->next_bdp_ping_timer_expired_locked);
+}
- GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
+static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx,
+ void *tp, grpc_error *error) {
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp;
+ GPR_ASSERT(t->have_next_bdp_ping_timer);
+ t->have_next_bdp_ping_timer = false;
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
+ return;
+ }
+ schedule_bdp_ping_locked(exec_ctx, t);
}
void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args,
@@ -2645,7 +2699,7 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg;
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
- if (t->destroying || t->closed) {
+ if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
} else if (error == GRPC_ERROR_NONE) {
if (t->keepalive_permit_without_calls ||
@@ -2703,8 +2757,11 @@ static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
if (error == GRPC_ERROR_NONE) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
- close_transport_locked(exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "keepalive watchdog timeout"));
+ close_transport_locked(
+ exec_ctx, t,
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "keepalive watchdog timeout"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL));
}
} else {
/* The watchdog timer should have been cancelled by
diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc
index 2428e2526d..d0e80c4bd5 100644
--- a/src/core/ext/transport/chttp2/transport/flow_control.cc
+++ b/src/core/ext/transport/chttp2/transport/flow_control.cc
@@ -459,12 +459,9 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
}
}
if (tfc->enable_bdp_probe) {
- action.need_ping =
- grpc_bdp_estimator_need_ping(exec_ctx, &tfc->bdp_estimator);
-
// get bdp estimate and update initial_window accordingly.
int64_t estimate = -1;
- if (grpc_bdp_estimator_get_estimate(&tfc->bdp_estimator, &estimate)) {
+ if (tfc->bdp_estimator->EstimateBdp(&estimate)) {
double target = 1 + log2((double)estimate);
// target might change based on how much memory pressure we are under
@@ -491,7 +488,7 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
// get bandwidth estimate and update max_frame accordingly.
double bw_dbl = -1;
- if (grpc_bdp_estimator_get_bw(&tfc->bdp_estimator, &bw_dbl)) {
+ if (tfc->bdp_estimator->EstimateBandwidth(&bw_dbl)) {
// we target the max of BDP or bandwidth in microseconds.
int32_t frame_size = (int32_t)GPR_CLAMP(
GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000,
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 5d27bb8d67..703f3ba348 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -37,6 +37,7 @@
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/pid_controller.h"
@@ -268,7 +269,7 @@ typedef struct {
bool enable_bdp_probe;
/* bdp estimation */
- grpc_bdp_estimator bdp_estimator;
+ grpc_core::ManualConstructor<grpc_core::BdpEstimator> bdp_estimator;
/* pid controller */
bool pid_controller_initialized;
@@ -297,7 +298,7 @@ struct grpc_chttp2_transport {
/** is the transport destroying itself? */
uint8_t destroying;
/** has the upper layer closed the transport? */
- uint8_t closed;
+ grpc_error *closed_with_error;
/** is there a read request to the endpoint outstanding? */
uint8_t endpoint_reading;
@@ -339,7 +340,7 @@ struct grpc_chttp2_transport {
/** hpack encoding */
grpc_chttp2_hpack_compressor hpack_compressor;
/** is this a client? */
- uint8_t is_client;
+ bool is_client;
/** data to write next write */
grpc_slice_buffer qbuf;
@@ -349,14 +350,14 @@ struct grpc_chttp2_transport {
uint32_t write_buffer_size;
/** have we seen a goaway */
- uint8_t seen_goaway;
+ bool seen_goaway;
/** have we sent a goaway */
grpc_chttp2_sent_goaway_state sent_goaway_state;
/** are the local settings dirty and need to be sent? */
- uint8_t dirtied_local_settings;
+ bool dirtied_local_settings;
/** have local settings been sent? */
- uint8_t sent_local_settings;
+ bool sent_local_settings;
/** bitmask of setting indexes to send out */
uint32_t force_send_settings;
/** settings values */
@@ -421,6 +422,7 @@ struct grpc_chttp2_transport {
grpc_chttp2_write_cb *write_cb_pool;
/* bdp estimator */
+ grpc_closure next_bdp_ping_timer_expired_locked;
grpc_closure start_bdp_ping_locked;
grpc_closure finish_bdp_ping_locked;
@@ -441,6 +443,10 @@ struct grpc_chttp2_transport {
/** destructive cleanup closure */
grpc_closure destructive_reclaimer_locked;
+ /* next bdp ping timer */
+ bool have_next_bdp_ping_timer;
+ grpc_timer next_bdp_ping_timer;
+
/* keep-alive ping support */
/** Closure to initialize a keepalive ping */
grpc_closure init_keepalive_ping_locked;
@@ -748,7 +754,6 @@ typedef struct {
grpc_chttp2_flowctl_urgency send_setting_update;
uint32_t initial_window_size;
uint32_t max_frame_size;
- bool need_ping;
} grpc_chttp2_flowctl_action;
// Reads the flow control data and returns and actionable struct that will tell
diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc
index 25c1a5ef05..c6fecf2ee9 100644
--- a/src/core/ext/transport/chttp2/transport/writing.cc
+++ b/src/core/ext/transport/chttp2/transport/writing.cc
@@ -245,7 +245,8 @@ class WriteContext {
void UpdateStreamsNoLongerStalled() {
grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) {
- if (!t_->closed && grpc_chttp2_list_add_writable_stream(t_, s)) {
+ if (t_->closed_with_error == GRPC_ERROR_NONE &&
+ grpc_chttp2_list_add_writable_stream(t_, s)) {
if (!stream_ref_if_not_destroyed(&s->refcount->refs)) {
grpc_chttp2_list_remove_writable_stream(t_, s);
}
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index 1001d74c22..67a8358927 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -62,96 +62,22 @@ typedef struct inproc_transport {
struct inproc_stream *stream_list;
} inproc_transport;
-typedef struct sb_list_entry {
- grpc_slice_buffer sb;
- struct sb_list_entry *next;
-} sb_list_entry;
-
-// Specialize grpc_byte_stream for our use case
-typedef struct {
- grpc_byte_stream base;
- sb_list_entry *le;
- grpc_error *shutdown_error;
-} inproc_slice_byte_stream;
-
-typedef struct {
- // TODO (vjpai): Add some inlined elements to avoid alloc in simple cases
- sb_list_entry *head;
- sb_list_entry *tail;
-} slice_buffer_list;
-
-static void slice_buffer_list_init(slice_buffer_list *l) {
- l->head = NULL;
- l->tail = NULL;
-}
-
-static void sb_list_entry_destroy(grpc_exec_ctx *exec_ctx, sb_list_entry *le) {
- grpc_slice_buffer_destroy_internal(exec_ctx, &le->sb);
- gpr_free(le);
-}
-
-static void slice_buffer_list_destroy(grpc_exec_ctx *exec_ctx,
- slice_buffer_list *l) {
- sb_list_entry *curr = l->head;
- while (curr != NULL) {
- sb_list_entry *le = curr;
- curr = curr->next;
- sb_list_entry_destroy(exec_ctx, le);
- }
- l->head = NULL;
- l->tail = NULL;
-}
-
-static bool slice_buffer_list_empty(slice_buffer_list *l) {
- return l->head == NULL;
-}
-
-static void slice_buffer_list_append_entry(slice_buffer_list *l,
- sb_list_entry *next) {
- next->next = NULL;
- if (l->tail) {
- l->tail->next = next;
- l->tail = next;
- } else {
- l->head = next;
- l->tail = next;
- }
-}
-
-static grpc_slice_buffer *slice_buffer_list_append(slice_buffer_list *l) {
- sb_list_entry *next = (sb_list_entry *)gpr_malloc(sizeof(*next));
- grpc_slice_buffer_init(&next->sb);
- slice_buffer_list_append_entry(l, next);
- return &next->sb;
-}
-
-static sb_list_entry *slice_buffer_list_pophead(slice_buffer_list *l) {
- sb_list_entry *ret = l->head;
- l->head = l->head->next;
- if (l->head == NULL) {
- l->tail = NULL;
- }
- return ret;
-}
-
typedef struct inproc_stream {
inproc_transport *t;
grpc_metadata_batch to_read_initial_md;
uint32_t to_read_initial_md_flags;
bool to_read_initial_md_filled;
- slice_buffer_list to_read_message;
grpc_metadata_batch to_read_trailing_md;
bool to_read_trailing_md_filled;
- bool reads_needed;
- bool read_closure_scheduled;
- grpc_closure read_closure;
+ bool ops_needed;
+ bool op_closure_scheduled;
+ grpc_closure op_closure;
// Write buffer used only during gap at init time when client-side
// stream is set up but server side stream is not yet set up
grpc_metadata_batch write_buffer_initial_md;
bool write_buffer_initial_md_filled;
uint32_t write_buffer_initial_md_flags;
grpc_millis write_buffer_deadline;
- slice_buffer_list write_buffer_message;
grpc_metadata_batch write_buffer_trailing_md;
bool write_buffer_trailing_md_filled;
grpc_error *write_buffer_cancel_error;
@@ -164,11 +90,15 @@ typedef struct inproc_stream {
gpr_arena *arena;
+ grpc_transport_stream_op_batch *send_message_op;
+ grpc_transport_stream_op_batch *send_trailing_md_op;
grpc_transport_stream_op_batch *recv_initial_md_op;
grpc_transport_stream_op_batch *recv_message_op;
grpc_transport_stream_op_batch *recv_trailing_md_op;
- inproc_slice_byte_stream recv_message_stream;
+ grpc_slice_buffer recv_message;
+ grpc_slice_buffer_stream recv_stream;
+ bool recv_inited;
bool initial_md_sent;
bool trailing_md_sent;
@@ -187,54 +117,11 @@ typedef struct inproc_stream {
struct inproc_stream *stream_list_next;
} inproc_stream;
-static bool inproc_slice_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *bs, size_t max,
- grpc_closure *on_complete) {
- // Because inproc transport always provides the entire message atomically,
- // the byte stream always has data available when this function is called.
- // Thus, this function always returns true (unlike other transports) and
- // there is never any need to schedule a closure
- return true;
-}
-
-static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *bs,
- grpc_slice *slice) {
- inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
- if (stream->shutdown_error != GRPC_ERROR_NONE) {
- return GRPC_ERROR_REF(stream->shutdown_error);
- }
- *slice = grpc_slice_buffer_take_first(&stream->le->sb);
- return GRPC_ERROR_NONE;
-}
-
-static void inproc_slice_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *bs,
- grpc_error *error) {
- inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
- GRPC_ERROR_UNREF(stream->shutdown_error);
- stream->shutdown_error = error;
-}
-
-static void inproc_slice_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *bs) {
- inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
- sb_list_entry_destroy(exec_ctx, stream->le);
- GRPC_ERROR_UNREF(stream->shutdown_error);
-}
-
-static const grpc_byte_stream_vtable inproc_slice_byte_stream_vtable = {
- inproc_slice_byte_stream_next, inproc_slice_byte_stream_pull,
- inproc_slice_byte_stream_shutdown, inproc_slice_byte_stream_destroy};
-
-void inproc_slice_byte_stream_init(inproc_slice_byte_stream *s,
- sb_list_entry *le) {
- s->base.length = (uint32_t)le->sb.length;
- s->base.flags = 0;
- s->base.vtable = &inproc_slice_byte_stream_vtable;
- s->le = le;
- s->shutdown_error = GRPC_ERROR_NONE;
-}
+static grpc_closure do_nothing_closure;
+static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
+ grpc_error *error);
+static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
static void ref_transport(inproc_transport *t) {
INPROC_LOG(GPR_DEBUG, "ref_transport %p", t);
@@ -280,12 +167,14 @@ static void unref_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s,
static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
INPROC_LOG(GPR_DEBUG, "really_destroy_stream %p", s);
- slice_buffer_list_destroy(exec_ctx, &s->to_read_message);
- slice_buffer_list_destroy(exec_ctx, &s->write_buffer_message);
GRPC_ERROR_UNREF(s->write_buffer_cancel_error);
GRPC_ERROR_UNREF(s->cancel_self_error);
GRPC_ERROR_UNREF(s->cancel_other_error);
+ if (s->recv_inited) {
+ grpc_slice_buffer_destroy_internal(exec_ctx, &s->recv_message);
+ }
+
unref_transport(exec_ctx, s->t);
if (s->closure_at_destroy) {
@@ -293,9 +182,6 @@ static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
}
}
-static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
-
static void log_metadata(const grpc_metadata_batch *md_batch, bool is_client,
bool is_initial) {
for (grpc_linked_mdelem *md = md_batch->list.head; md != NULL;
@@ -359,11 +245,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->write_buffer_initial_md_filled = false;
grpc_metadata_batch_init(&s->write_buffer_trailing_md);
s->write_buffer_trailing_md_filled = false;
- slice_buffer_list_init(&s->to_read_message);
- slice_buffer_list_init(&s->write_buffer_message);
- s->reads_needed = false;
- s->read_closure_scheduled = false;
- GRPC_CLOSURE_INIT(&s->read_closure, read_state_machine, s,
+ s->ops_needed = false;
+ s->op_closure_scheduled = false;
+ GRPC_CLOSURE_INIT(&s->op_closure, op_state_machine, s,
grpc_schedule_on_exec_ctx);
s->t = t;
s->closure_at_destroy = NULL;
@@ -425,11 +309,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_initial_md);
cs->write_buffer_initial_md_filled = false;
}
- while (!slice_buffer_list_empty(&cs->write_buffer_message)) {
- slice_buffer_list_append_entry(
- &s->to_read_message,
- slice_buffer_list_pophead(&cs->write_buffer_message));
- }
if (cs->write_buffer_trailing_md_filled) {
fill_in_metadata(exec_ctx, s, &cs->write_buffer_trailing_md, 0,
&s->to_read_trailing_md, NULL,
@@ -488,9 +367,39 @@ static void close_other_side_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
}
}
+// Call the on_complete closure associated with this stream_op_batch if
+// this stream_op_batch is only one of the pending operations for this
+// stream. This is called when one of the pending operations for the stream
+// is done and about to be NULLed out
+static void complete_if_batch_end_locked(grpc_exec_ctx *exec_ctx,
+ inproc_stream *s, grpc_error *error,
+ grpc_transport_stream_op_batch *op,
+ const char *msg) {
+ int is_sm = (int)(op == s->send_message_op);
+ int is_stm = (int)(op == s->send_trailing_md_op);
+ int is_rim = (int)(op == s->recv_initial_md_op);
+ int is_rm = (int)(op == s->recv_message_op);
+ int is_rtm = (int)(op == s->recv_trailing_md_op);
+
+ if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
+ INPROC_LOG(GPR_DEBUG, "%s %p %p %p", msg, s, op, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_REF(error));
+ }
+}
+
+static void maybe_schedule_op_closure_locked(grpc_exec_ctx *exec_ctx,
+ inproc_stream *s,
+ grpc_error *error) {
+ if (s && s->ops_needed && !s->op_closure_scheduled) {
+ GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_REF(error));
+ s->op_closure_scheduled = true;
+ s->ops_needed = false;
+ }
+}
+
static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
grpc_error *error) {
- INPROC_LOG(GPR_DEBUG, "read_state_machine %p fail_helper", s);
+ INPROC_LOG(GPR_DEBUG, "op_state_machine %p fail_helper", s);
// If we're failing this side, we need to make sure that
// we also send or have already sent trailing metadata
if (!s->trailing_md_sent) {
@@ -512,14 +421,7 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
if (other->cancel_other_error == GRPC_ERROR_NONE) {
other->cancel_other_error = GRPC_ERROR_REF(error);
}
- if (other->reads_needed) {
- if (!other->read_closure_scheduled) {
- GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure,
- GRPC_ERROR_REF(error));
- other->read_closure_scheduled = true;
- }
- other->reads_needed = false;
- }
+ maybe_schedule_op_closure_locked(exec_ctx, other, error);
} else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
s->write_buffer_cancel_error = GRPC_ERROR_REF(error);
}
@@ -564,14 +466,9 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
err);
// Last use of err so no need to REF and then UNREF it
- if ((s->recv_initial_md_op != s->recv_message_op) &&
- (s->recv_initial_md_op != s->recv_trailing_md_op)) {
- INPROC_LOG(GPR_DEBUG,
- "fail_helper %p scheduling initial-metadata-on-complete %p",
- error, s);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete,
- GRPC_ERROR_REF(error));
- }
+ complete_if_batch_end_locked(
+ exec_ctx, s, error, s->recv_initial_md_op,
+ "fail_helper scheduling recv-initial-metadata-on-complete");
s->recv_initial_md_op = NULL;
}
if (s->recv_message_op) {
@@ -580,20 +477,30 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
GRPC_CLOSURE_SCHED(
exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
- if (s->recv_message_op != s->recv_trailing_md_op) {
- INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling message-on-complete %p",
- s, error);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
- GRPC_ERROR_REF(error));
- }
+ complete_if_batch_end_locked(
+ exec_ctx, s, error, s->recv_message_op,
+ "fail_helper scheduling recv-message-on-complete");
s->recv_message_op = NULL;
}
+ if (s->send_message_op) {
+ complete_if_batch_end_locked(
+ exec_ctx, s, error, s->send_message_op,
+ "fail_helper scheduling send-message-on-complete");
+ s->send_message_op = NULL;
+ }
+ if (s->send_trailing_md_op) {
+ complete_if_batch_end_locked(
+ exec_ctx, s, error, s->send_trailing_md_op,
+ "fail_helper scheduling send-trailng-md-on-complete");
+ s->send_trailing_md_op = NULL;
+ }
if (s->recv_trailing_md_op) {
INPROC_LOG(GPR_DEBUG,
"fail_helper %p scheduling trailing-md-on-complete %p", s,
error);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
- GRPC_ERROR_REF(error));
+ complete_if_batch_end_locked(
+ exec_ctx, s, error, s->recv_trailing_md_op,
+ "fail_helper scheduling recv-trailing-metadata-on-complete");
s->recv_trailing_md_op = NULL;
}
close_other_side_locked(exec_ctx, s, "fail_helper:other_side");
@@ -602,12 +509,61 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
GRPC_ERROR_UNREF(error);
}
-static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void message_transfer_locked(grpc_exec_ctx *exec_ctx,
+ inproc_stream *sender,
+ inproc_stream *receiver) {
+ size_t remaining =
+ sender->send_message_op->payload->send_message.send_message->length;
+ if (receiver->recv_inited) {
+ grpc_slice_buffer_destroy_internal(exec_ctx, &receiver->recv_message);
+ }
+ grpc_slice_buffer_init(&receiver->recv_message);
+ receiver->recv_inited = true;
+ do {
+ grpc_slice message_slice;
+ grpc_closure unused;
+ GPR_ASSERT(grpc_byte_stream_next(
+ exec_ctx, sender->send_message_op->payload->send_message.send_message,
+ SIZE_MAX, &unused));
+ grpc_error *error = grpc_byte_stream_pull(
+ exec_ctx, sender->send_message_op->payload->send_message.send_message,
+ &message_slice);
+ if (error != GRPC_ERROR_NONE) {
+ cancel_stream_locked(exec_ctx, sender, GRPC_ERROR_REF(error));
+ break;
+ }
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ remaining -= GRPC_SLICE_LENGTH(message_slice);
+ grpc_slice_buffer_add(&receiver->recv_message, message_slice);
+ } while (remaining > 0);
+
+ grpc_slice_buffer_stream_init(&receiver->recv_stream, &receiver->recv_message,
+ 0);
+ *receiver->recv_message_op->payload->recv_message.recv_message =
+ &receiver->recv_stream.base;
+ INPROC_LOG(GPR_DEBUG, "message_transfer_locked %p scheduling message-ready",
+ receiver);
+ GRPC_CLOSURE_SCHED(
+ exec_ctx,
+ receiver->recv_message_op->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_NONE);
+ complete_if_batch_end_locked(
+ exec_ctx, sender, GRPC_ERROR_NONE, sender->send_message_op,
+ "message_transfer scheduling sender on_complete");
+ complete_if_batch_end_locked(
+ exec_ctx, receiver, GRPC_ERROR_NONE, receiver->recv_message_op,
+ "message_transfer scheduling receiver on_complete");
+
+ receiver->recv_message_op = NULL;
+ sender->send_message_op = NULL;
+}
+
+static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
// This function gets called when we have contents in the unprocessed reads
// Get what we want based on our ops wanted
// Schedule our appropriate closures
- // and then return to reads_needed state if still needed
+ // and then return to ops_needed state if still needed
// Since this is a closure directly invoked by the combiner, it should not
// unref the error parameter explicitly; the combiner will do that implicitly
@@ -615,12 +571,14 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
bool needs_close = false;
- INPROC_LOG(GPR_DEBUG, "read_state_machine %p", arg);
+ INPROC_LOG(GPR_DEBUG, "op_state_machine %p", arg);
inproc_stream *s = (inproc_stream *)arg;
gpr_mu *mu = &s->t->mu->mu; // keep aside in case s gets closed
gpr_mu_lock(mu);
- s->read_closure_scheduled = false;
+ s->op_closure_scheduled = false;
// cancellation takes precedence
+ inproc_stream *other = s->other_side;
+
if (s->cancel_self_error != GRPC_ERROR_NONE) {
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_self_error));
goto done;
@@ -632,89 +590,116 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
goto done;
}
- if (s->recv_initial_md_op) {
- if (!s->to_read_initial_md_filled) {
- // We entered the state machine on some other kind of read even though
- // we still haven't satisfied initial md . That's an error.
- new_err =
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected frame sequencing");
- INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p scheduling on_complete errors for no "
- "initial md %p",
- s, new_err);
+ if (s->send_message_op && other) {
+ if (other->recv_message_op) {
+ message_transfer_locked(exec_ctx, s, other);
+ maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
+ } else if (!s->t->is_client &&
+ (s->trailing_md_sent || other->recv_trailing_md_op)) {
+ // A server send will never be matched if the client is waiting
+ // for trailing metadata already
+ complete_if_batch_end_locked(
+ exec_ctx, s, GRPC_ERROR_NONE, s->send_message_op,
+ "op_state_machine scheduling send-message-on-complete");
+ s->send_message_op = NULL;
+ }
+ }
+ // Pause a send trailing metadata if there is still an outstanding
+ // send message unless we know that the send message will never get
+ // matched to a receive. This happens on the client if the server has
+ // already sent status.
+ if (s->send_trailing_md_op &&
+ (!s->send_message_op ||
+ (s->t->is_client &&
+ (s->trailing_md_recvd || s->to_read_trailing_md_filled)))) {
+ grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md
+ : &other->to_read_trailing_md;
+ bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled
+ : &other->to_read_trailing_md_filled;
+ if (*destfilled || s->trailing_md_sent) {
+ // The buffer is already in use; that's an error!
+ INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s);
+ new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
goto done;
- } else if (s->initial_md_recvd) {
+ } else {
+ if (other && !other->closed) {
+ fill_in_metadata(exec_ctx, s,
+ s->send_trailing_md_op->payload->send_trailing_metadata
+ .send_trailing_metadata,
+ 0, dest, NULL, destfilled);
+ }
+ s->trailing_md_sent = true;
+ if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
+ INPROC_LOG(GPR_DEBUG,
+ "op_state_machine %p scheduling trailing-md-on-complete", s);
+ GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
+ GRPC_ERROR_NONE);
+ s->recv_trailing_md_op = NULL;
+ needs_close = true;
+ }
+ }
+ maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
+ complete_if_batch_end_locked(
+ exec_ctx, s, GRPC_ERROR_NONE, s->send_trailing_md_op,
+ "op_state_machine scheduling send-trailing-metadata-on-complete");
+ s->send_trailing_md_op = NULL;
+ }
+ if (s->recv_initial_md_op) {
+ if (s->initial_md_recvd) {
new_err =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md");
INPROC_LOG(
GPR_DEBUG,
- "read_state_machine %p scheduling on_complete errors for already "
+ "op_state_machine %p scheduling on_complete errors for already "
"recvd initial md %p",
s, new_err);
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
goto done;
}
- s->initial_md_recvd = true;
- new_err = fill_in_metadata(
- exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags,
- s->recv_initial_md_op->payload->recv_initial_metadata
- .recv_initial_metadata,
- s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, NULL);
- s->recv_initial_md_op->payload->recv_initial_metadata.recv_initial_metadata
- ->deadline = s->deadline;
- grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md);
- s->to_read_initial_md_filled = false;
- INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p scheduling initial-metadata-ready %p", s,
- new_err);
- GRPC_CLOSURE_SCHED(exec_ctx,
- s->recv_initial_md_op->payload->recv_initial_metadata
- .recv_initial_metadata_ready,
- GRPC_ERROR_REF(new_err));
- if ((s->recv_initial_md_op != s->recv_message_op) &&
- (s->recv_initial_md_op != s->recv_trailing_md_op)) {
- INPROC_LOG(
- GPR_DEBUG,
- "read_state_machine %p scheduling initial-metadata-on-complete %p", s,
- new_err);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete,
- GRPC_ERROR_REF(new_err));
- }
- s->recv_initial_md_op = NULL;
-
- if (new_err != GRPC_ERROR_NONE) {
+ if (s->to_read_initial_md_filled) {
+ s->initial_md_recvd = true;
+ new_err = fill_in_metadata(
+ exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags,
+ s->recv_initial_md_op->payload->recv_initial_metadata
+ .recv_initial_metadata,
+ s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
+ NULL);
+ s->recv_initial_md_op->payload->recv_initial_metadata
+ .recv_initial_metadata->deadline = s->deadline;
+ grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md);
+ s->to_read_initial_md_filled = false;
INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p scheduling on_complete errors2 %p", s,
+ "op_state_machine %p scheduling initial-metadata-ready %p", s,
new_err);
- fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
- goto done;
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ s->recv_initial_md_op->payload->recv_initial_metadata
+ .recv_initial_metadata_ready,
+ GRPC_ERROR_REF(new_err));
+ complete_if_batch_end_locked(
+ exec_ctx, s, new_err, s->recv_initial_md_op,
+ "op_state_machine scheduling recv-initial-metadata-on-complete");
+ s->recv_initial_md_op = NULL;
+
+ if (new_err != GRPC_ERROR_NONE) {
+ INPROC_LOG(GPR_DEBUG,
+ "op_state_machine %p scheduling on_complete errors2 %p", s,
+ new_err);
+ fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
+ goto done;
+ }
}
}
- if (s->to_read_initial_md_filled) {
- new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected recv frame");
- fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
- goto done;
- }
- if (!slice_buffer_list_empty(&s->to_read_message) && s->recv_message_op) {
- inproc_slice_byte_stream_init(
- &s->recv_message_stream,
- slice_buffer_list_pophead(&s->to_read_message));
- *s->recv_message_op->payload->recv_message.recv_message =
- &s->recv_message_stream.base;
- INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s);
- GRPC_CLOSURE_SCHED(
- exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
- GRPC_ERROR_NONE);
- if (s->recv_message_op != s->recv_trailing_md_op) {
- INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p scheduling message-on-complete %p", s,
- new_err);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
- GRPC_ERROR_REF(new_err));
+ if (s->recv_message_op) {
+ if (other && other->send_message_op) {
+ message_transfer_locked(exec_ctx, other, s);
+ maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
}
- s->recv_message_op = NULL;
+ }
+ if (s->recv_trailing_md_op && s->t->is_client && other &&
+ other->send_message_op) {
+ maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
}
if (s->to_read_trailing_md_filled) {
if (s->trailing_md_recvd) {
@@ -722,7 +707,7 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md");
INPROC_LOG(
GPR_DEBUG,
- "read_state_machine %p scheduling on_complete errors for already "
+ "op_state_machine %p scheduling on_complete errors for already "
"recvd trailing md %p",
s, new_err);
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
@@ -731,21 +716,24 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
if (s->recv_message_op != NULL) {
// This message needs to be wrapped up because it will never be
// satisfied
- INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready",
- s);
+ INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s);
GRPC_CLOSURE_SCHED(
exec_ctx,
s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
- if (s->recv_message_op != s->recv_trailing_md_op) {
- INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p scheduling message-on-complete %p", s,
- new_err);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
- GRPC_ERROR_REF(new_err));
- }
+ complete_if_batch_end_locked(
+ exec_ctx, s, new_err, s->recv_message_op,
+ "op_state_machine scheduling recv-message-on-complete");
s->recv_message_op = NULL;
}
+ if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
+ // Nothing further will try to receive from this stream, so finish off
+ // any outstanding send_message op
+ complete_if_batch_end_locked(
+ exec_ctx, s, new_err, s->send_message_op,
+ "op_state_machine scheduling send-message-on-complete");
+ s->send_message_op = NULL;
+ }
if (s->recv_trailing_md_op != NULL) {
// We wanted trailing metadata and we got it
s->trailing_md_recvd = true;
@@ -763,61 +751,65 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
// (If the server hasn't already sent its trailing md, it doesn't have
// a final status, so don't mark this op complete)
if (s->t->is_client || s->trailing_md_sent) {
- INPROC_LOG(
- GPR_DEBUG,
- "read_state_machine %p scheduling trailing-md-on-complete %p", s,
- new_err);
+ INPROC_LOG(GPR_DEBUG,
+ "op_state_machine %p scheduling trailing-md-on-complete %p",
+ s, new_err);
GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(new_err));
s->recv_trailing_md_op = NULL;
needs_close = true;
} else {
INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p server needs to delay handling "
+ "op_state_machine %p server needs to delay handling "
"trailing-md-on-complete %p",
s, new_err);
}
} else {
INPROC_LOG(
GPR_DEBUG,
- "read_state_machine %p has trailing md but not yet waiting for it",
- s);
+ "op_state_machine %p has trailing md but not yet waiting for it", s);
}
}
if (s->trailing_md_recvd && s->recv_message_op) {
// No further message will come on this stream, so finish off the
// recv_message_op
- INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s);
+ INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s);
GRPC_CLOSURE_SCHED(
exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
- if (s->recv_message_op != s->recv_trailing_md_op) {
- INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p scheduling message-on-complete %p", s,
- new_err);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
- GRPC_ERROR_REF(new_err));
- }
+ complete_if_batch_end_locked(
+ exec_ctx, s, new_err, s->recv_message_op,
+ "op_state_machine scheduling recv-message-on-complete");
s->recv_message_op = NULL;
}
- if (s->recv_message_op || s->recv_trailing_md_op) {
+ if (s->trailing_md_recvd && (s->trailing_md_sent || s->t->is_client) &&
+ s->send_message_op) {
+ // Nothing further will try to receive from this stream, so finish off
+ // any outstanding send_message op
+ complete_if_batch_end_locked(
+ exec_ctx, s, new_err, s->send_message_op,
+ "op_state_machine scheduling send-message-on-complete");
+ s->send_message_op = NULL;
+ }
+ if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
+ s->recv_message_op || s->recv_trailing_md_op) {
// Didn't get the item we wanted so we still need to get
// rescheduled
- INPROC_LOG(GPR_DEBUG, "read_state_machine %p still needs closure %p %p", s,
- s->recv_message_op, s->recv_trailing_md_op);
- s->reads_needed = true;
+ INPROC_LOG(
+ GPR_DEBUG, "op_state_machine %p still needs closure %p %p %p %p %p", s,
+ s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
+ s->recv_message_op, s->recv_trailing_md_op);
+ s->ops_needed = true;
}
done:
if (needs_close) {
- close_other_side_locked(exec_ctx, s, "read_state_machine");
+ close_other_side_locked(exec_ctx, s, "op_state_machine");
close_stream_locked(exec_ctx, s);
}
gpr_mu_unlock(mu);
GRPC_ERROR_UNREF(new_err);
}
-static grpc_closure do_nothing_closure;
-
static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
grpc_error *error) {
bool ret = false; // was the cancel accepted
@@ -826,14 +818,7 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
if (s->cancel_self_error == GRPC_ERROR_NONE) {
ret = true;
s->cancel_self_error = GRPC_ERROR_REF(error);
- if (s->reads_needed) {
- if (!s->read_closure_scheduled) {
- GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure,
- GRPC_ERROR_REF(s->cancel_self_error));
- s->read_closure_scheduled = true;
- }
- s->reads_needed = false;
- }
+ maybe_schedule_op_closure_locked(exec_ctx, s, s->cancel_self_error);
// Send trailing md to the other side indicating cancellation, even if we
// already have
s->trailing_md_sent = true;
@@ -853,14 +838,8 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
if (other->cancel_other_error == GRPC_ERROR_NONE) {
other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error);
}
- if (other->reads_needed) {
- if (!other->read_closure_scheduled) {
- GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure,
- GRPC_ERROR_REF(other->cancel_other_error));
- other->read_closure_scheduled = true;
- }
- other->reads_needed = false;
- }
+ maybe_schedule_op_closure_locked(exec_ctx, other,
+ other->cancel_other_error);
} else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error);
}
@@ -869,11 +848,9 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
// couldn't complete that because we hadn't yet sent out trailing
// md, now's the chance
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
- INPROC_LOG(GPR_DEBUG,
- "cancel_stream %p scheduling trailing-md-on-complete %p", s,
- s->cancel_self_error);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
- GRPC_ERROR_REF(s->cancel_self_error));
+ complete_if_batch_end_locked(
+ exec_ctx, s, s->cancel_self_error, s->recv_trailing_md_op,
+ "cancel_stream scheduling trailing-md-on-complete");
s->recv_trailing_md_op = NULL;
}
}
@@ -918,7 +895,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
// already self-canceled so still give it an error
error = GRPC_ERROR_REF(s->cancel_self_error);
} else {
- INPROC_LOG(GPR_DEBUG, "perform_stream_op %p%s%s%s%s%s%s", s,
+ INPROC_LOG(GPR_DEBUG, "perform_stream_op %p %s%s%s%s%s%s%s", s,
+ s->t->is_client ? "client" : "server",
op->send_initial_metadata ? " send_initial_metadata" : "",
op->send_message ? " send_message" : "",
op->send_trailing_metadata ? " send_trailing_metadata" : "",
@@ -929,10 +907,9 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
bool needs_close = false;
+ inproc_stream *other = s->other_side;
if (error == GRPC_ERROR_NONE &&
- (op->send_initial_metadata || op->send_message ||
- op->send_trailing_metadata)) {
- inproc_stream *other = s->other_side;
+ (op->send_initial_metadata || op->send_trailing_metadata)) {
if (s->t->is_closed) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
}
@@ -963,72 +940,21 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->initial_md_sent = true;
}
}
- }
- if (error == GRPC_ERROR_NONE && op->send_message) {
- size_t remaining = op->payload->send_message.send_message->length;
- grpc_slice_buffer *dest = slice_buffer_list_append(
- (other == NULL) ? &s->write_buffer_message : &other->to_read_message);
- do {
- grpc_slice message_slice;
- grpc_closure unused;
- GPR_ASSERT(grpc_byte_stream_next(exec_ctx,
- op->payload->send_message.send_message,
- SIZE_MAX, &unused));
- error = grpc_byte_stream_pull(
- exec_ctx, op->payload->send_message.send_message, &message_slice);
- if (error != GRPC_ERROR_NONE) {
- cancel_stream_locked(exec_ctx, s, GRPC_ERROR_REF(error));
- break;
- }
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- remaining -= GRPC_SLICE_LENGTH(message_slice);
- grpc_slice_buffer_add(dest, message_slice);
- } while (remaining != 0);
- grpc_byte_stream_destroy(exec_ctx,
- op->payload->send_message.send_message);
- }
- if (error == GRPC_ERROR_NONE && op->send_trailing_metadata) {
- grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md
- : &other->to_read_trailing_md;
- bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled
- : &other->to_read_trailing_md_filled;
- if (*destfilled || s->trailing_md_sent) {
- // The buffer is already in use; that's an error!
- INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s);
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
- } else {
- if (!other->closed) {
- fill_in_metadata(
- exec_ctx, s,
- op->payload->send_trailing_metadata.send_trailing_metadata, 0,
- dest, NULL, destfilled);
- }
- s->trailing_md_sent = true;
- if (!s->t->is_client && s->trailing_md_recvd &&
- s->recv_trailing_md_op) {
- INPROC_LOG(GPR_DEBUG,
- "perform_stream_op %p scheduling trailing-md-on-complete",
- s);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
- GRPC_ERROR_NONE);
- s->recv_trailing_md_op = NULL;
- needs_close = true;
- }
- }
- }
- if (other != NULL && other->reads_needed) {
- if (!other->read_closure_scheduled) {
- GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, error);
- other->read_closure_scheduled = true;
- }
- other->reads_needed = false;
+ maybe_schedule_op_closure_locked(exec_ctx, other, error);
}
}
+
if (error == GRPC_ERROR_NONE &&
- (op->recv_initial_metadata || op->recv_message ||
+ (op->send_message || op->send_trailing_metadata ||
+ op->recv_initial_metadata || op->recv_message ||
op->recv_trailing_metadata)) {
- // If there are any reads, mark it so that the read closure will react to
- // them
+ // Mark ops that need to be processed by the closure
+ if (op->send_message) {
+ s->send_message_op = op;
+ }
+ if (op->send_trailing_metadata) {
+ s->send_trailing_md_op = op;
+ }
if (op->recv_initial_metadata) {
s->recv_initial_md_op = op;
}
@@ -1040,25 +966,28 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
}
// We want to initiate the closure if:
- // 1. There is initial metadata and something ready to take that
- // 2. There is a message and something ready to take it
- // 3. There is trailing metadata, even if nothing specifically wants
- // that because that can shut down the message as well
- if ((s->to_read_initial_md_filled && op->recv_initial_metadata) ||
- ((!slice_buffer_list_empty(&s->to_read_message) ||
- s->trailing_md_recvd) &&
- op->recv_message) ||
- (s->to_read_trailing_md_filled)) {
- if (!s->read_closure_scheduled) {
- GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure, GRPC_ERROR_NONE);
- s->read_closure_scheduled = true;
+ // 1. We want to send a message and the other side wants to receive or end
+ // 2. We want to send trailing metadata and there isn't an unmatched send
+ // 3. We want initial metadata and the other side has sent it
+ // 4. We want to receive a message and there is a message ready
+ // 5. There is trailing metadata, even if nothing specifically wants
+ // that because that can shut down the receive message as well
+ if ((op->send_message && other && ((other->recv_message_op != NULL) ||
+ (other->recv_trailing_md_op != NULL))) ||
+ (op->send_trailing_metadata && !op->send_message) ||
+ (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
+ (op->recv_message && (other && other->send_message_op != NULL)) ||
+ (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
+ if (!s->op_closure_scheduled) {
+ GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_NONE);
+ s->op_closure_scheduled = true;
}
} else {
- s->reads_needed = true;
+ s->ops_needed = true;
}
} else {
if (error != GRPC_ERROR_NONE) {
- // Schedule op's read closures that we didn't push to read state machine
+ // Schedule op's closures that we didn't push to op state machine
if (op->recv_initial_metadata) {
INPROC_LOG(
GPR_DEBUG,
diff --git a/src/core/lib/debug/stats_data.cc b/src/core/lib/debug/stats_data.cc
index 5bd7884e28..5d737c56cb 100644
--- a/src/core/lib/debug/stats_data.cc
+++ b/src/core/lib/debug/stats_data.cc
@@ -104,6 +104,10 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
"combiner_locks_scheduled_items",
"combiner_locks_scheduled_final_items",
"combiner_locks_offloaded",
+ "call_combiner_locks_initiated",
+ "call_combiner_locks_scheduled_items",
+ "call_combiner_set_notify_on_cancel",
+ "call_combiner_cancelled",
"executor_scheduled_short_items",
"executor_scheduled_long_items",
"executor_scheduled_to_self",
@@ -112,6 +116,9 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
"executor_push_retries",
"server_requested_calls",
"server_slowpath_requests_queued",
+ "cq_ev_queue_trylock_failures",
+ "cq_ev_queue_trylock_successes",
+ "cq_ev_queue_transient_pop_failures",
};
const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
"Number of client side calls created by this process",
@@ -210,6 +217,11 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
"Number of items scheduled against combiner locks",
"Number of final items scheduled against combiner locks",
"Number of combiner locks offloaded to different threads",
+ "Number of call combiner lock entries by process (first items queued to a "
+ "call combiner)",
+ "Number of items scheduled against call combiner locks",
+ "Number of times a cancellation callback was set on a call combiner",
+ "Number of times a call combiner was cancelled",
"Number of finite runtime closures scheduled against the executor (gRPC "
"thread pool)",
"Number of potentially infinite runtime closures scheduled against the "
@@ -222,6 +234,12 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
"How many calls were requested (not necessarily received) by the server",
"How many times was the server slow path taken (indicates too few "
"outstanding requests)",
+ "Number of lock (trylock) acquisition failures on completion queue event "
+ "queue. High value here indicates high contention on completion queues",
+ "Number of lock (trylock) acquisition successes on completion queue event "
+ "queue.",
+ "Number of times NULL was popped out of completion queue's event queue "
+ "even though the event queue was not empty",
};
const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = {
"call_initial_size",
diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h
index d8e4e7d264..031942df5c 100644
--- a/src/core/lib/debug/stats_data.h
+++ b/src/core/lib/debug/stats_data.h
@@ -110,6 +110,10 @@ typedef enum {
GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_ITEMS,
GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS,
GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED,
+ GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_INITIATED,
+ GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS,
+ GRPC_STATS_COUNTER_CALL_COMBINER_SET_NOTIFY_ON_CANCEL,
+ GRPC_STATS_COUNTER_CALL_COMBINER_CANCELLED,
GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_SHORT_ITEMS,
GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_LONG_ITEMS,
GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_TO_SELF,
@@ -118,6 +122,9 @@ typedef enum {
GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES,
GRPC_STATS_COUNTER_SERVER_REQUESTED_CALLS,
GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED,
+ GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_FAILURES,
+ GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_SUCCESSES,
+ GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES,
GRPC_STATS_COUNTER_COUNT
} grpc_stats_counters;
extern const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT];
@@ -404,6 +411,17 @@ typedef enum {
#define GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED)
+#define GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), \
+ GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_INITIATED)
+#define GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS)
+#define GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_SET_NOTIFY_ON_CANCEL)
+#define GRPC_STATS_INC_CALL_COMBINER_CANCELLED(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_CANCELLED)
#define GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_SHORT_ITEMS)
@@ -425,6 +443,15 @@ typedef enum {
#define GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED)
+#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), \
+ GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_FAILURES)
+#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), \
+ GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_SUCCESSES)
+#define GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES)
#define GRPC_STATS_INC_CALL_INITIAL_SIZE(exec_ctx, value) \
grpc_stats_inc_call_initial_size((exec_ctx), (int)(value))
void grpc_stats_inc_call_initial_size(grpc_exec_ctx *exec_ctx, int x);
diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml
index 5c0ab2262e..af4553028e 100644
--- a/src/core/lib/debug/stats_data.yaml
+++ b/src/core/lib/debug/stats_data.yaml
@@ -245,6 +245,16 @@
doc: Number of final items scheduled against combiner locks
- counter: combiner_locks_offloaded
doc: Number of combiner locks offloaded to different threads
+# call combiner locks
+- counter: call_combiner_locks_initiated
+ doc: Number of call combiner lock entries by process
+ (first items queued to a call combiner)
+- counter: call_combiner_locks_scheduled_items
+ doc: Number of items scheduled against call combiner locks
+- counter: call_combiner_set_notify_on_cancel
+ doc: Number of times a cancellation callback was set on a call combiner
+- counter: call_combiner_cancelled
+ doc: Number of times a call combiner was cancelled
# executor
- counter: executor_scheduled_short_items
doc: Number of finite runtime closures scheduled against the executor
@@ -272,4 +282,13 @@
- counter: server_slowpath_requests_queued
doc: How many times was the server slow path taken (indicates too few
outstanding requests)
-
+# cq
+- counter: cq_ev_queue_trylock_failures
+ doc: Number of lock (trylock) acquisition failures on completion queue event
+ queue. High value here indicates high contention on completion queues
+- counter: cq_ev_queue_trylock_successes
+ doc: Number of lock (trylock) acquisition successes on completion queue event
+ queue.
+- counter: cq_ev_queue_transient_pop_failures
+ doc: Number of times NULL was popped out of completion queue's event queue
+ even though the event queue was not empty
diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql
index 54869977b0..04b6d471f6 100644
--- a/src/core/lib/debug/stats_data_bq_schema.sql
+++ b/src/core/lib/debug/stats_data_bq_schema.sql
@@ -79,6 +79,10 @@ combiner_locks_initiated_per_iteration:FLOAT,
combiner_locks_scheduled_items_per_iteration:FLOAT,
combiner_locks_scheduled_final_items_per_iteration:FLOAT,
combiner_locks_offloaded_per_iteration:FLOAT,
+call_combiner_locks_initiated_per_iteration:FLOAT,
+call_combiner_locks_scheduled_items_per_iteration:FLOAT,
+call_combiner_set_notify_on_cancel_per_iteration:FLOAT,
+call_combiner_cancelled_per_iteration:FLOAT,
executor_scheduled_short_items_per_iteration:FLOAT,
executor_scheduled_long_items_per_iteration:FLOAT,
executor_scheduled_to_self_per_iteration:FLOAT,
@@ -86,4 +90,7 @@ executor_wakeup_initiated_per_iteration:FLOAT,
executor_queue_drained_per_iteration:FLOAT,
executor_push_retries_per_iteration:FLOAT,
server_requested_calls_per_iteration:FLOAT,
-server_slowpath_requests_queued_per_iteration:FLOAT
+server_slowpath_requests_queued_per_iteration:FLOAT,
+cq_ev_queue_trylock_failures_per_iteration:FLOAT,
+cq_ev_queue_trylock_successes_per_iteration:FLOAT,
+cq_ev_queue_transient_pop_failures_per_iteration:FLOAT
diff --git a/src/core/lib/http/httpcli_security_connector.cc b/src/core/lib/http/httpcli_security_connector.cc
index ef6c4a509b..d832dacb69 100644
--- a/src/core/lib/http/httpcli_security_connector.cc
+++ b/src/core/lib/http/httpcli_security_connector.cc
@@ -91,8 +91,17 @@ static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx,
tsi_peer_destruct(&peer);
}
+static int httpcli_ssl_cmp(grpc_security_connector *sc1,
+ grpc_security_connector *sc2) {
+ grpc_httpcli_ssl_channel_security_connector *c1 =
+ (grpc_httpcli_ssl_channel_security_connector *)sc1;
+ grpc_httpcli_ssl_channel_security_connector *c2 =
+ (grpc_httpcli_ssl_channel_security_connector *)sc2;
+ return strcmp(c1->secure_peer_name, c2->secure_peer_name);
+}
+
static grpc_security_connector_vtable httpcli_ssl_vtable = {
- httpcli_ssl_destroy, httpcli_ssl_check_peer};
+ httpcli_ssl_destroy, httpcli_ssl_check_peer, httpcli_ssl_cmp};
static grpc_security_status httpcli_ssl_channel_security_connector_create(
grpc_exec_ctx *exec_ctx, const char *pem_root_certs,
@@ -123,6 +132,10 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create(
*sc = NULL;
return GRPC_SECURITY_ERROR;
}
+ // We don't actually need a channel credentials object in this case,
+ // but we set it to a non-NULL address so that we don't trigger
+ // assertions in grpc_channel_security_connector_cmp().
+ c->base.channel_creds = (grpc_channel_credentials *)1;
c->base.add_handshakers = httpcli_ssl_add_handshakers;
*sc = &c->base;
return GRPC_SECURITY_OK;
diff --git a/src/core/lib/iomgr/call_combiner.cc b/src/core/lib/iomgr/call_combiner.cc
index bab3df021a..d45719608b 100644
--- a/src/core/lib/iomgr/call_combiner.cc
+++ b/src/core/lib/iomgr/call_combiner.cc
@@ -21,6 +21,8 @@
#include <inttypes.h>
#include <grpc/support/log.h>
+#include "src/core/lib/debug/stats.h"
+#include "src/core/lib/profiling/timers.h"
grpc_tracer_flag grpc_call_combiner_trace =
GRPC_TRACER_INITIALIZER(false, "call_combiner");
@@ -60,6 +62,7 @@ void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
grpc_closure* closure,
grpc_error* error DEBUG_ARGS,
const char* reason) {
+ GPR_TIMER_BEGIN("call_combiner_start", 0);
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
gpr_log(GPR_DEBUG,
"==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR
@@ -73,7 +76,10 @@ void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
prev_size + 1);
}
+ GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx);
if (prev_size == 0) {
+ GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED(exec_ctx);
+ GPR_TIMER_MARK("call_combiner_initiate", 0);
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY");
}
@@ -87,11 +93,13 @@ void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
closure->error_data.error = error;
gpr_mpscq_push(&call_combiner->queue, (gpr_mpscq_node*)closure);
}
+ GPR_TIMER_END("call_combiner_start", 0);
}
void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
grpc_call_combiner* call_combiner DEBUG_ARGS,
const char* reason) {
+ GPR_TIMER_BEGIN("call_combiner_stop", 0);
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
gpr_log(GPR_DEBUG,
"==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]",
@@ -130,11 +138,13 @@ void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
} else if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
gpr_log(GPR_DEBUG, " queue empty");
}
+ GPR_TIMER_END("call_combiner_stop", 0);
}
void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
grpc_call_combiner* call_combiner,
grpc_closure* closure) {
+ GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL(exec_ctx);
while (true) {
// Decode original state.
gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
@@ -179,6 +189,7 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx,
grpc_call_combiner* call_combiner,
grpc_error* error) {
+ GRPC_STATS_INC_CALL_COMBINER_CANCELLED(exec_ctx);
while (true) {
gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
grpc_error* original_error = decode_cancel_state_error(original_state);
diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc
index 0e707ef839..53f4b7eaa7 100644
--- a/src/core/lib/iomgr/combiner.cc
+++ b/src/core/lib/iomgr/combiner.cc
@@ -165,6 +165,7 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
lock, cl, last));
if (last == 1) {
GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(exec_ctx);
+ GPR_TIMER_MARK("combiner.initiated", 0);
gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null,
(gpr_atm)exec_ctx);
// first element on this list: add it to the list of combiner locks
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index 42033d0ba4..1cc6d98491 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -109,6 +109,16 @@
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
+#elif defined(GPR_OPENBSD)
+#define GRPC_HAVE_IFADDRS 1
+#define GRPC_HAVE_IPV6_RECVPKTINFO 1
+#define GRPC_HAVE_UNIX_SOCKET 1
+#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
+#define GRPC_POSIX_SOCKET 1
+#define GRPC_POSIX_SOCKETADDR 1
+#define GRPC_POSIX_SOCKETUTILS 1
+#define GRPC_POSIX_WAKEUP_FD 1
+#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_NACL)
#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
diff --git a/src/core/lib/profiling/basic_timers.cc b/src/core/lib/profiling/basic_timers.cc
index ab9d60481c..0ae7d7f600 100644
--- a/src/core/lib/profiling/basic_timers.cc
+++ b/src/core/lib/profiling/basic_timers.cc
@@ -209,9 +209,9 @@ static void init_output() {
static void rotate_log() {
/* Using malloc here, as this code could end up being called by gpr_malloc */
- gpr_timer_log *new = malloc(sizeof(*new));
+ gpr_timer_log *log = static_cast<gpr_timer_log *>(malloc(sizeof(*log)));
gpr_once_init(&g_once_init, init_output);
- new->num_entries = 0;
+ log->num_entries = 0;
pthread_mutex_lock(&g_mu);
if (g_thread_log != NULL) {
timer_log_remove(&g_in_progress_logs, g_thread_log);
@@ -221,9 +221,9 @@ static void rotate_log() {
} else {
g_thread_id = g_next_thread_id++;
}
- timer_log_push_back(&g_in_progress_logs, new);
+ timer_log_push_back(&g_in_progress_logs, log);
pthread_mutex_unlock(&g_mu);
- g_thread_log = new;
+ g_thread_log = log;
}
static void gpr_timers_log_add(const char *tagstr, marker_type type,
diff --git a/src/core/lib/security/credentials/fake/fake_credentials.cc b/src/core/lib/security/credentials/fake/fake_credentials.cc
index 795ca0660a..cf10bf24c8 100644
--- a/src/core/lib/security/credentials/fake/fake_credentials.cc
+++ b/src/core/lib/security/credentials/fake/fake_credentials.cc
@@ -38,7 +38,8 @@ static grpc_security_status fake_transport_security_create_security_connector(
grpc_call_credentials *call_creds, const char *target,
const grpc_channel_args *args, grpc_channel_security_connector **sc,
grpc_channel_args **new_args) {
- *sc = grpc_fake_channel_security_connector_create(call_creds, target, args);
+ *sc =
+ grpc_fake_channel_security_connector_create(c, call_creds, target, args);
return GRPC_SECURITY_OK;
}
@@ -46,7 +47,7 @@ static grpc_security_status
fake_transport_security_server_create_security_connector(
grpc_exec_ctx *exec_ctx, grpc_server_credentials *c,
grpc_server_security_connector **sc) {
- *sc = grpc_fake_server_security_connector_create();
+ *sc = grpc_fake_server_security_connector_create(c);
return GRPC_SECURITY_OK;
}
diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.cc b/src/core/lib/security/credentials/ssl/ssl_credentials.cc
index 9df69a2a3d..290336adc0 100644
--- a/src/core/lib/security/credentials/ssl/ssl_credentials.cc
+++ b/src/core/lib/security/credentials/ssl/ssl_credentials.cc
@@ -62,7 +62,8 @@ static grpc_security_status ssl_create_security_connector(
}
}
status = grpc_ssl_channel_security_connector_create(
- exec_ctx, call_creds, &c->config, target, overridden_target_name, sc);
+ exec_ctx, creds, call_creds, &c->config, target, overridden_target_name,
+ sc);
if (status != GRPC_SECURITY_OK) {
return status;
}
@@ -128,7 +129,8 @@ static grpc_security_status ssl_server_create_security_connector(
grpc_exec_ctx *exec_ctx, grpc_server_credentials *creds,
grpc_server_security_connector **sc) {
grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)creds;
- return grpc_ssl_server_security_connector_create(exec_ctx, &c->config, sc);
+ return grpc_ssl_server_security_connector_create(exec_ctx, creds, &c->config,
+ sc);
}
static grpc_server_credentials_vtable ssl_server_vtable = {
diff --git a/src/core/lib/security/transport/security_connector.cc b/src/core/lib/security/transport/security_connector.cc
index 51844fb91f..80d9a7b77f 100644
--- a/src/core/lib/security/transport/security_connector.cc
+++ b/src/core/lib/security/transport/security_connector.cc
@@ -136,6 +136,39 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx,
}
}
+int grpc_security_connector_cmp(grpc_security_connector *sc,
+ grpc_security_connector *other) {
+ if (sc == NULL || other == NULL) return GPR_ICMP(sc, other);
+ int c = GPR_ICMP(sc->vtable, other->vtable);
+ if (c != 0) return c;
+ return sc->vtable->cmp(sc, other);
+}
+
+int grpc_channel_security_connector_cmp(grpc_channel_security_connector *sc1,
+ grpc_channel_security_connector *sc2) {
+ GPR_ASSERT(sc1->channel_creds != NULL);
+ GPR_ASSERT(sc2->channel_creds != NULL);
+ int c = GPR_ICMP(sc1->channel_creds, sc2->channel_creds);
+ if (c != 0) return c;
+ c = GPR_ICMP(sc1->request_metadata_creds, sc2->request_metadata_creds);
+ if (c != 0) return c;
+ c = GPR_ICMP((void *)sc1->check_call_host, (void *)sc2->check_call_host);
+ if (c != 0) return c;
+ c = GPR_ICMP((void *)sc1->cancel_check_call_host,
+ (void *)sc2->cancel_check_call_host);
+ if (c != 0) return c;
+ return GPR_ICMP((void *)sc1->add_handshakers, (void *)sc2->add_handshakers);
+}
+
+int grpc_server_security_connector_cmp(grpc_server_security_connector *sc1,
+ grpc_server_security_connector *sc2) {
+ GPR_ASSERT(sc1->server_creds != NULL);
+ GPR_ASSERT(sc2->server_creds != NULL);
+ int c = GPR_ICMP(sc1->server_creds, sc2->server_creds);
+ if (c != 0) return c;
+ return GPR_ICMP((void *)sc1->add_handshakers, (void *)sc2->add_handshakers);
+}
+
bool grpc_channel_security_connector_check_call_host(
grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
const char *host, grpc_auth_context *auth_context,
@@ -199,25 +232,27 @@ void grpc_security_connector_unref(grpc_exec_ctx *exec_ctx,
if (gpr_unref(&sc->refcount)) sc->vtable->destroy(exec_ctx, sc);
}
-static void connector_pointer_arg_destroy(grpc_exec_ctx *exec_ctx, void *p) {
+static void connector_arg_destroy(grpc_exec_ctx *exec_ctx, void *p) {
GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, (grpc_security_connector *)p,
- "connector_pointer_arg_destroy");
+ "connector_arg_destroy");
}
-static void *connector_pointer_arg_copy(void *p) {
+static void *connector_arg_copy(void *p) {
return GRPC_SECURITY_CONNECTOR_REF((grpc_security_connector *)p,
- "connector_pointer_arg_copy");
+ "connector_arg_copy");
}
-static int connector_pointer_cmp(void *a, void *b) { return GPR_ICMP(a, b); }
+static int connector_cmp(void *a, void *b) {
+ return grpc_security_connector_cmp((grpc_security_connector *)a,
+ (grpc_security_connector *)b);
+}
-static const grpc_arg_pointer_vtable connector_pointer_vtable = {
- connector_pointer_arg_copy, connector_pointer_arg_destroy,
- connector_pointer_cmp};
+static const grpc_arg_pointer_vtable connector_arg_vtable = {
+ connector_arg_copy, connector_arg_destroy, connector_cmp};
grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc) {
return grpc_channel_arg_pointer_create((char *)GRPC_ARG_SECURITY_CONNECTOR,
- sc, &connector_pointer_vtable);
+ sc, &connector_arg_vtable);
}
grpc_security_connector *grpc_security_connector_from_arg(const grpc_arg *arg) {
@@ -382,6 +417,32 @@ static void fake_server_check_peer(grpc_exec_ctx *exec_ctx,
fake_check_peer(exec_ctx, sc, peer, auth_context, on_peer_checked);
}
+static int fake_channel_cmp(grpc_security_connector *sc1,
+ grpc_security_connector *sc2) {
+ grpc_fake_channel_security_connector *c1 =
+ (grpc_fake_channel_security_connector *)sc1;
+ grpc_fake_channel_security_connector *c2 =
+ (grpc_fake_channel_security_connector *)sc2;
+ int c = grpc_channel_security_connector_cmp(&c1->base, &c2->base);
+ if (c != 0) return c;
+ c = strcmp(c1->target, c2->target);
+ if (c != 0) return c;
+ if (c1->expected_targets == NULL || c2->expected_targets == NULL) {
+ c = GPR_ICMP(c1->expected_targets, c2->expected_targets);
+ } else {
+ c = strcmp(c1->expected_targets, c2->expected_targets);
+ }
+ if (c != 0) return c;
+ return GPR_ICMP(c1->is_lb_channel, c2->is_lb_channel);
+}
+
+static int fake_server_cmp(grpc_security_connector *sc1,
+ grpc_security_connector *sc2) {
+ return grpc_server_security_connector_cmp(
+ (grpc_server_security_connector *)sc1,
+ (grpc_server_security_connector *)sc2);
+}
+
static bool fake_channel_check_call_host(grpc_exec_ctx *exec_ctx,
grpc_channel_security_connector *sc,
const char *host,
@@ -418,12 +479,13 @@ static void fake_server_add_handshakers(grpc_exec_ctx *exec_ctx,
}
static grpc_security_connector_vtable fake_channel_vtable = {
- fake_channel_destroy, fake_channel_check_peer};
+ fake_channel_destroy, fake_channel_check_peer, fake_channel_cmp};
static grpc_security_connector_vtable fake_server_vtable = {
- fake_server_destroy, fake_server_check_peer};
+ fake_server_destroy, fake_server_check_peer, fake_server_cmp};
grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
+ grpc_channel_credentials *channel_creds,
grpc_call_credentials *request_metadata_creds, const char *target,
const grpc_channel_args *args) {
grpc_fake_channel_security_connector *c =
@@ -431,6 +493,7 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
gpr_ref_init(&c->base.base.refcount, 1);
c->base.base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME;
c->base.base.vtable = &fake_channel_vtable;
+ c->base.channel_creds = channel_creds;
c->base.request_metadata_creds =
grpc_call_credentials_ref(request_metadata_creds);
c->base.check_call_host = fake_channel_check_call_host;
@@ -444,13 +507,14 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
}
grpc_server_security_connector *grpc_fake_server_security_connector_create(
- void) {
+ grpc_server_credentials *server_creds) {
grpc_server_security_connector *c =
(grpc_server_security_connector *)gpr_zalloc(
sizeof(grpc_server_security_connector));
gpr_ref_init(&c->base.refcount, 1);
c->base.vtable = &fake_server_vtable;
c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME;
+ c->server_creds = server_creds;
c->add_handshakers = fake_server_add_handshakers;
return c;
}
@@ -473,6 +537,7 @@ static void ssl_channel_destroy(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc) {
grpc_ssl_channel_security_connector *c =
(grpc_ssl_channel_security_connector *)sc;
+ grpc_channel_credentials_unref(exec_ctx, c->base.channel_creds);
grpc_call_credentials_unref(exec_ctx, c->base.request_metadata_creds);
tsi_ssl_client_handshaker_factory_unref(c->client_handshaker_factory);
c->client_handshaker_factory = NULL;
@@ -485,6 +550,7 @@ static void ssl_server_destroy(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc) {
grpc_ssl_server_security_connector *c =
(grpc_ssl_server_security_connector *)sc;
+ grpc_server_credentials_unref(exec_ctx, c->base.server_creds);
tsi_ssl_server_handshaker_factory_unref(c->server_handshaker_factory);
c->server_handshaker_factory = NULL;
gpr_free(sc);
@@ -641,6 +707,29 @@ static void ssl_server_check_peer(grpc_exec_ctx *exec_ctx,
GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, error);
}
+static int ssl_channel_cmp(grpc_security_connector *sc1,
+ grpc_security_connector *sc2) {
+ grpc_ssl_channel_security_connector *c1 =
+ (grpc_ssl_channel_security_connector *)sc1;
+ grpc_ssl_channel_security_connector *c2 =
+ (grpc_ssl_channel_security_connector *)sc2;
+ int c = grpc_channel_security_connector_cmp(&c1->base, &c2->base);
+ if (c != 0) return c;
+ c = strcmp(c1->target_name, c2->target_name);
+ if (c != 0) return c;
+ return (c1->overridden_target_name == NULL ||
+ c2->overridden_target_name == NULL)
+ ? GPR_ICMP(c1->overridden_target_name, c2->overridden_target_name)
+ : strcmp(c1->overridden_target_name, c2->overridden_target_name);
+}
+
+static int ssl_server_cmp(grpc_security_connector *sc1,
+ grpc_security_connector *sc2) {
+ return grpc_server_security_connector_cmp(
+ (grpc_server_security_connector *)sc1,
+ (grpc_server_security_connector *)sc2);
+}
+
static void add_shallow_auth_property_to_peer(tsi_peer *peer,
const grpc_auth_property *prop,
const char *tsi_prop_name) {
@@ -717,10 +806,10 @@ static void ssl_channel_cancel_check_call_host(
}
static grpc_security_connector_vtable ssl_channel_vtable = {
- ssl_channel_destroy, ssl_channel_check_peer};
+ ssl_channel_destroy, ssl_channel_check_peer, ssl_channel_cmp};
static grpc_security_connector_vtable ssl_server_vtable = {
- ssl_server_destroy, ssl_server_check_peer};
+ ssl_server_destroy, ssl_server_check_peer, ssl_server_cmp};
/* returns a NULL terminated slice. */
static grpc_slice compute_default_pem_root_certs_once(void) {
@@ -804,7 +893,8 @@ const char *grpc_get_default_ssl_roots(void) {
}
grpc_security_status grpc_ssl_channel_security_connector_create(
- grpc_exec_ctx *exec_ctx, grpc_call_credentials *request_metadata_creds,
+ grpc_exec_ctx *exec_ctx, grpc_channel_credentials *channel_creds,
+ grpc_call_credentials *request_metadata_creds,
const grpc_ssl_config *config, const char *target_name,
const char *overridden_target_name, grpc_channel_security_connector **sc) {
size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions();
@@ -840,6 +930,7 @@ grpc_security_status grpc_ssl_channel_security_connector_create(
gpr_ref_init(&c->base.base.refcount, 1);
c->base.base.vtable = &ssl_channel_vtable;
c->base.base.url_scheme = GRPC_SSL_URL_SCHEME;
+ c->base.channel_creds = grpc_channel_credentials_ref(channel_creds);
c->base.request_metadata_creds =
grpc_call_credentials_ref(request_metadata_creds);
c->base.check_call_host = ssl_channel_check_call_host;
@@ -874,8 +965,8 @@ error:
}
grpc_security_status grpc_ssl_server_security_connector_create(
- grpc_exec_ctx *exec_ctx, const grpc_ssl_server_config *config,
- grpc_server_security_connector **sc) {
+ grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_creds,
+ const grpc_ssl_server_config *config, grpc_server_security_connector **sc) {
size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions();
const char **alpn_protocol_strings =
(const char **)gpr_malloc(sizeof(const char *) * num_alpn_protocols);
@@ -897,6 +988,7 @@ grpc_security_status grpc_ssl_server_security_connector_create(
gpr_ref_init(&c->base.base.refcount, 1);
c->base.base.url_scheme = GRPC_SSL_URL_SCHEME;
c->base.base.vtable = &ssl_server_vtable;
+ c->base.server_creds = grpc_server_credentials_ref(server_creds);
result = tsi_create_ssl_server_handshaker_factory_ex(
config->pem_key_cert_pairs, config->num_key_cert_pairs,
config->pem_root_certs, get_tsi_client_certificate_request_type(
diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h
index 4d87cd0c80..216bb35e81 100644
--- a/src/core/lib/security/transport/security_connector.h
+++ b/src/core/lib/security/transport/security_connector.h
@@ -60,13 +60,9 @@ typedef struct {
void (*check_peer)(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc,
tsi_peer peer, grpc_auth_context **auth_context,
grpc_closure *on_peer_checked);
+ int (*cmp)(grpc_security_connector *sc, grpc_security_connector *other);
} grpc_security_connector_vtable;
-typedef struct grpc_security_connector_handshake_list {
- void *handshake;
- struct grpc_security_connector_handshake_list *next;
-} grpc_security_connector_handshake_list;
-
struct grpc_security_connector {
const grpc_security_connector_vtable *vtable;
gpr_refcount refcount;
@@ -104,6 +100,10 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx,
grpc_auth_context **auth_context,
grpc_closure *on_peer_checked);
+/* Compares two security connectors. */
+int grpc_security_connector_cmp(grpc_security_connector *sc,
+ grpc_security_connector *other);
+
/* Util to encapsulate the connector in a channel arg. */
grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc);
@@ -116,13 +116,14 @@ grpc_security_connector *grpc_security_connector_find_in_args(
/* --- channel_security_connector object. ---
- A channel security connector object represents away to configure the
+ A channel security connector object represents a way to configure the
underlying transport security mechanism on the client side. */
typedef struct grpc_channel_security_connector grpc_channel_security_connector;
struct grpc_channel_security_connector {
grpc_security_connector base;
+ grpc_channel_credentials *channel_creds;
grpc_call_credentials *request_metadata_creds;
bool (*check_call_host)(grpc_exec_ctx *exec_ctx,
grpc_channel_security_connector *sc, const char *host,
@@ -138,6 +139,10 @@ struct grpc_channel_security_connector {
grpc_handshake_manager *handshake_mgr);
};
+/// A helper function for use in grpc_security_connector_cmp() implementations.
+int grpc_channel_security_connector_cmp(grpc_channel_security_connector *sc1,
+ grpc_channel_security_connector *sc2);
+
/// Checks that the host that will be set for a call is acceptable.
/// Returns true if completed synchronously, in which case \a error will
/// be set to indicate the result. Otherwise, \a on_call_host_checked
@@ -161,18 +166,23 @@ void grpc_channel_security_connector_add_handshakers(
/* --- server_security_connector object. ---
- A server security connector object represents away to configure the
+ A server security connector object represents a way to configure the
underlying transport security mechanism on the server side. */
typedef struct grpc_server_security_connector grpc_server_security_connector;
struct grpc_server_security_connector {
grpc_security_connector base;
+ grpc_server_credentials *server_creds;
void (*add_handshakers)(grpc_exec_ctx *exec_ctx,
grpc_server_security_connector *sc,
grpc_handshake_manager *handshake_mgr);
};
+/// A helper function for use in grpc_security_connector_cmp() implementations.
+int grpc_server_security_connector_cmp(grpc_server_security_connector *sc1,
+ grpc_server_security_connector *sc2);
+
void grpc_server_security_connector_add_handshakers(
grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
grpc_handshake_manager *handshake_mgr);
@@ -182,13 +192,14 @@ void grpc_server_security_connector_add_handshakers(
/* For TESTING ONLY!
Creates a fake connector that emulates real channel security. */
grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
+ grpc_channel_credentials *channel_creds,
grpc_call_credentials *request_metadata_creds, const char *target,
const grpc_channel_args *args);
/* For TESTING ONLY!
Creates a fake connector that emulates real server security. */
grpc_server_security_connector *grpc_fake_server_security_connector_create(
- void);
+ grpc_server_credentials *server_creds);
/* Config for ssl clients. */
@@ -211,7 +222,8 @@ typedef struct {
specific error code otherwise.
*/
grpc_security_status grpc_ssl_channel_security_connector_create(
- grpc_exec_ctx *exec_ctx, grpc_call_credentials *request_metadata_creds,
+ grpc_exec_ctx *exec_ctx, grpc_channel_credentials *channel_creds,
+ grpc_call_credentials *request_metadata_creds,
const grpc_ssl_config *config, const char *target_name,
const char *overridden_target_name, grpc_channel_security_connector **sc);
@@ -236,8 +248,8 @@ typedef struct {
specific error code otherwise.
*/
grpc_security_status grpc_ssl_server_security_connector_create(
- grpc_exec_ctx *exec_ctx, const grpc_ssl_server_config *config,
- grpc_server_security_connector **sc);
+ grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_creds,
+ const grpc_ssl_server_config *config, grpc_server_security_connector **sc);
/* Util. */
const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer,
diff --git a/src/core/lib/support/manual_constructor.h b/src/core/lib/support/manual_constructor.h
new file mode 100644
index 0000000000..d753cf98a0
--- /dev/null
+++ b/src/core/lib/support/manual_constructor.h
@@ -0,0 +1,76 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_SUPPORT_MANUAL_CONSTRUCTOR_H
+#define GRPC_CORE_LIB_SUPPORT_MANUAL_CONSTRUCTOR_H
+
+// manually construct a region of memory with some type
+
+#include <stddef.h>
+#include <new>
+#include <type_traits>
+#include <utility>
+
+namespace grpc_core {
+
+template <typename Type>
+class ManualConstructor {
+ public:
+ // No constructor or destructor because one of the most useful uses of
+ // this class is as part of a union, and members of a union could not have
+ // constructors or destructors till C++11. And, anyway, the whole point of
+ // this class is to bypass constructor and destructor.
+
+ Type* get() { return reinterpret_cast<Type*>(&space_); }
+ const Type* get() const { return reinterpret_cast<const Type*>(&space_); }
+
+ Type* operator->() { return get(); }
+ const Type* operator->() const { return get(); }
+
+ Type& operator*() { return *get(); }
+ const Type& operator*() const { return *get(); }
+
+ void Init() { new (&space_) Type; }
+
+ // Init() constructs the Type instance using the given arguments
+ // (which are forwarded to Type's constructor).
+ //
+ // Note that Init() with no arguments performs default-initialization,
+ // not zero-initialization (i.e it behaves the same as "new Type;", not
+ // "new Type();"), so it will leave non-class types uninitialized.
+ template <typename... Ts>
+ void Init(Ts&&... args) {
+ new (&space_) Type(std::forward<Ts>(args)...);
+ }
+
+ // Init() that is equivalent to copy and move construction.
+ // Enables usage like this:
+ // ManualConstructor<std::vector<int>> v;
+ // v.Init({1, 2, 3});
+ void Init(const Type& x) { new (&space_) Type(x); }
+ void Init(Type&& x) { new (&space_) Type(std::move(x)); }
+
+ void Destroy() { get()->~Type(); }
+
+ private:
+ typename std::aligned_storage<sizeof(Type), alignof(Type)>::type space_;
+};
+
+} // namespace grpc_core
+
+#endif
diff --git a/src/core/lib/support/time_posix.cc b/src/core/lib/support/time_posix.cc
index 3267ea6b54..3f8a9094fd 100644
--- a/src/core/lib/support/time_posix.cc
+++ b/src/core/lib/support/time_posix.cc
@@ -42,7 +42,7 @@ static struct timespec timespec_from_gpr(gpr_timespec gts) {
return rv;
}
-#if _POSIX_TIMERS > 0
+#if _POSIX_TIMERS > 0 || defined(__OpenBSD__)
static gpr_timespec gpr_from_timespec(struct timespec ts,
gpr_clock_type clock_type) {
/*
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index 36b4b835f8..21664f03c8 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -362,11 +362,24 @@ static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) {
static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) {
grpc_cq_completion *c = NULL;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
if (gpr_spinlock_trylock(&q->queue_lock)) {
- c = (grpc_cq_completion *)gpr_mpscq_pop(&q->queue);
+ GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(&exec_ctx);
+
+ bool is_empty = false;
+ c = (grpc_cq_completion *)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty);
gpr_spinlock_unlock(&q->queue_lock);
+
+ if (c == NULL && !is_empty) {
+ GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(&exec_ctx);
+ }
+ } else {
+ GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(&exec_ctx);
}
+ grpc_exec_ctx_finish(&exec_ctx);
+
if (c) {
gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1);
}
diff --git a/src/core/lib/transport/bdp_estimator.cc b/src/core/lib/transport/bdp_estimator.cc
index 6ed427ce5c..f1597014b1 100644
--- a/src/core/lib/transport/bdp_estimator.cc
+++ b/src/core/lib/transport/bdp_estimator.cc
@@ -21,117 +21,64 @@
#include <inttypes.h>
#include <stdlib.h>
-#include <grpc/support/log.h>
#include <grpc/support/useful.h>
grpc_tracer_flag grpc_bdp_estimator_trace =
GRPC_TRACER_INITIALIZER(false, "bdp_estimator");
-void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name) {
- estimator->estimate = 65536;
- estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED;
- estimator->ping_start_time = gpr_time_0(GPR_CLOCK_MONOTONIC);
- estimator->next_ping_scheduled = 0;
- estimator->name = name;
- estimator->bw_est = 0;
- estimator->inter_ping_delay = 100.0; // start at 100ms
- estimator->stable_estimate_count = 0;
-}
-
-bool grpc_bdp_estimator_get_estimate(const grpc_bdp_estimator *estimator,
- int64_t *estimate) {
- *estimate = estimator->estimate;
- return true;
-}
-
-bool grpc_bdp_estimator_get_bw(const grpc_bdp_estimator *estimator,
- double *bw) {
- *bw = estimator->bw_est;
- return true;
-}
-
-void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
- int64_t num_bytes) {
- estimator->accumulator += num_bytes;
-}
-
-bool grpc_bdp_estimator_need_ping(grpc_exec_ctx *exec_ctx,
- const grpc_bdp_estimator *estimator) {
- switch (estimator->ping_state) {
- case GRPC_BDP_PING_UNSCHEDULED:
- return grpc_exec_ctx_now(exec_ctx) >= estimator->next_ping_scheduled;
- case GRPC_BDP_PING_SCHEDULED:
- return false;
- case GRPC_BDP_PING_STARTED:
- return false;
- }
- GPR_UNREACHABLE_CODE(return false);
-}
+namespace grpc_core {
-void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator) {
- if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
- gpr_log(GPR_DEBUG, "bdp[%s]:sched acc=%" PRId64 " est=%" PRId64,
- estimator->name, estimator->accumulator, estimator->estimate);
- }
- GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_UNSCHEDULED);
- estimator->ping_state = GRPC_BDP_PING_SCHEDULED;
- estimator->accumulator = 0;
-}
+BdpEstimator::BdpEstimator(const char *name)
+ : ping_state_(PingState::UNSCHEDULED),
+ accumulator_(0),
+ estimate_(65536),
+ ping_start_time_(gpr_time_0(GPR_CLOCK_MONOTONIC)),
+ inter_ping_delay_(100.0), // start at 100ms
+ stable_estimate_count_(0),
+ bw_est_(0),
+ name_(name) {}
-void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator) {
- if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
- gpr_log(GPR_DEBUG, "bdp[%s]:start acc=%" PRId64 " est=%" PRId64,
- estimator->name, estimator->accumulator, estimator->estimate);
- }
- GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_SCHEDULED);
- estimator->ping_state = GRPC_BDP_PING_STARTED;
- estimator->accumulator = 0;
- estimator->ping_start_time = gpr_now(GPR_CLOCK_MONOTONIC);
-}
-
-void grpc_bdp_estimator_complete_ping(grpc_exec_ctx *exec_ctx,
- grpc_bdp_estimator *estimator) {
+grpc_millis BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
- gpr_timespec dt_ts = gpr_time_sub(now, estimator->ping_start_time);
+ gpr_timespec dt_ts = gpr_time_sub(now, ping_start_time_);
double dt = (double)dt_ts.tv_sec + 1e-9 * (double)dt_ts.tv_nsec;
- double bw = dt > 0 ? ((double)estimator->accumulator / dt) : 0;
- int start_inter_ping_delay = estimator->inter_ping_delay;
+ double bw = dt > 0 ? ((double)accumulator_ / dt) : 0;
+ int start_inter_ping_delay = inter_ping_delay_;
if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
gpr_log(GPR_DEBUG, "bdp[%s]:complete acc=%" PRId64 " est=%" PRId64
" dt=%lf bw=%lfMbs bw_est=%lfMbs",
- estimator->name, estimator->accumulator, estimator->estimate, dt,
- bw / 125000.0, estimator->bw_est / 125000.0);
+ name_, accumulator_, estimate_, dt, bw / 125000.0,
+ bw_est_ / 125000.0);
}
- GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_STARTED);
- if (estimator->accumulator > 2 * estimator->estimate / 3 &&
- bw > estimator->bw_est) {
- estimator->estimate =
- GPR_MAX(estimator->accumulator, estimator->estimate * 2);
- estimator->bw_est = bw;
+ GPR_ASSERT(ping_state_ == PingState::STARTED);
+ if (accumulator_ > 2 * estimate_ / 3 && bw > bw_est_) {
+ estimate_ = GPR_MAX(accumulator_, estimate_ * 2);
+ bw_est_ = bw;
if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
- gpr_log(GPR_DEBUG, "bdp[%s]: estimate increased to %" PRId64,
- estimator->name, estimator->estimate);
+ gpr_log(GPR_DEBUG, "bdp[%s]: estimate increased to %" PRId64, name_,
+ estimate_);
}
- estimator->inter_ping_delay /= 2; // if the ping estimate changes,
- // exponentially get faster at probing
- } else if (estimator->inter_ping_delay < 10000) {
- estimator->stable_estimate_count++;
- if (estimator->stable_estimate_count >= 2) {
- estimator->inter_ping_delay +=
+ inter_ping_delay_ /= 2; // if the ping estimate changes,
+ // exponentially get faster at probing
+ } else if (inter_ping_delay_ < 10000) {
+ stable_estimate_count_++;
+ if (stable_estimate_count_ >= 2) {
+ inter_ping_delay_ +=
100 +
(int)(rand() * 100.0 / RAND_MAX); // if the ping estimate is steady,
// slowly ramp down the probe time
}
}
- if (start_inter_ping_delay != estimator->inter_ping_delay) {
- estimator->stable_estimate_count = 0;
+ if (start_inter_ping_delay != inter_ping_delay_) {
+ stable_estimate_count_ = 0;
if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
- gpr_log(GPR_DEBUG, "bdp[%s]:update_inter_time to %dms", estimator->name,
- estimator->inter_ping_delay);
+ gpr_log(GPR_DEBUG, "bdp[%s]:update_inter_time to %dms", name_,
+ inter_ping_delay_);
}
}
- estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED;
- estimator->accumulator = 0;
- estimator->next_ping_scheduled =
- grpc_exec_ctx_now(exec_ctx) + estimator->inter_ping_delay;
+ ping_state_ = PingState::UNSCHEDULED;
+ accumulator_ = 0;
+ return grpc_exec_ctx_now(exec_ctx) + inter_ping_delay_;
}
+
+} // namespace grpc_core
diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h
index a9d986782c..470c127f7f 100644
--- a/src/core/lib/transport/bdp_estimator.h
+++ b/src/core/lib/transport/bdp_estimator.h
@@ -19,67 +19,83 @@
#ifndef GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H
#define GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H
-#include <grpc/support/time.h>
+#include <grpc/support/port_platform.h>
+
+#include <inttypes.h>
#include <stdbool.h>
#include <stdint.h>
+
+#include <grpc/support/log.h>
+#include <grpc/support/time.h>
+
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/exec_ctx.h"
-#define GRPC_BDP_SAMPLES 16
-#define GRPC_BDP_MIN_SAMPLES_FOR_ESTIMATE 3
+extern grpc_tracer_flag grpc_bdp_estimator_trace;
-#ifdef __cplusplus
-extern "C" {
-#endif
+namespace grpc_core {
-extern grpc_tracer_flag grpc_bdp_estimator_trace;
+class BdpEstimator {
+ public:
+ explicit BdpEstimator(const char *name);
+ ~BdpEstimator() {}
-typedef enum {
- GRPC_BDP_PING_UNSCHEDULED,
- GRPC_BDP_PING_SCHEDULED,
- GRPC_BDP_PING_STARTED
-} grpc_bdp_estimator_ping_state;
+ // Returns true if a reasonable estimate could be obtained
+ bool EstimateBdp(int64_t *estimate_out) const {
+ *estimate_out = estimate_;
+ return true;
+ }
+ bool EstimateBandwidth(double *bw_out) const {
+ *bw_out = bw_est_;
+ return true;
+ }
-typedef struct grpc_bdp_estimator {
- grpc_bdp_estimator_ping_state ping_state;
- int64_t accumulator;
- int64_t estimate;
+ void AddIncomingBytes(int64_t num_bytes) { accumulator_ += num_bytes; }
+
+ // Schedule a ping: call in response to receiving a true from
+ // grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a
+ // transport (but not necessarily started)
+ void SchedulePing() {
+ if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
+ gpr_log(GPR_DEBUG, "bdp[%s]:sched acc=%" PRId64 " est=%" PRId64, name_,
+ accumulator_, estimate_);
+ }
+ GPR_ASSERT(ping_state_ == PingState::UNSCHEDULED);
+ ping_state_ = PingState::SCHEDULED;
+ accumulator_ = 0;
+ }
+
+ // Start a ping: call after calling grpc_bdp_estimator_schedule_ping and
+ // once
+ // the ping is on the wire
+ void StartPing() {
+ if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
+ gpr_log(GPR_DEBUG, "bdp[%s]:start acc=%" PRId64 " est=%" PRId64, name_,
+ accumulator_, estimate_);
+ }
+ GPR_ASSERT(ping_state_ == PingState::SCHEDULED);
+ ping_state_ = PingState::STARTED;
+ accumulator_ = 0;
+ ping_start_time_ = gpr_now(GPR_CLOCK_MONOTONIC);
+ }
+
+ // Completes a previously started ping, returns when to schedule the next one
+ grpc_millis CompletePing(grpc_exec_ctx *exec_ctx);
+
+ private:
+ enum class PingState { UNSCHEDULED, SCHEDULED, STARTED };
+
+ PingState ping_state_;
+ int64_t accumulator_;
+ int64_t estimate_;
// when was the current ping started?
- gpr_timespec ping_start_time;
- // when should the next ping start?
- grpc_millis next_ping_scheduled;
- int inter_ping_delay;
- int stable_estimate_count;
- double bw_est;
- const char *name;
-} grpc_bdp_estimator;
-
-void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name);
-
-// Returns true if a reasonable estimate could be obtained
-bool grpc_bdp_estimator_get_estimate(const grpc_bdp_estimator *estimator,
- int64_t *estimate);
-// Tracks new bytes read.
-bool grpc_bdp_estimator_get_bw(const grpc_bdp_estimator *estimator, double *bw);
-// Returns true if the user should schedule a ping
-void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
- int64_t num_bytes);
-// Returns true if the user should schedule a ping
-bool grpc_bdp_estimator_need_ping(grpc_exec_ctx *exec_ctx,
- const grpc_bdp_estimator *estimator);
-// Schedule a ping: call in response to receiving a true from
-// grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a
-// transport (but not necessarily started)
-void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator);
-// Start a ping: call after calling grpc_bdp_estimator_schedule_ping and once
-// the ping is on the wire
-void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator);
-// Completes a previously started ping
-void grpc_bdp_estimator_complete_ping(grpc_exec_ctx *exec_ctx,
- grpc_bdp_estimator *estimator);
-
-#ifdef __cplusplus
-}
-#endif
+ gpr_timespec ping_start_time_;
+ int inter_ping_delay_;
+ int stable_estimate_count_;
+ double bw_est_;
+ const char *name_;
+};
+
+} // namespace grpc_core
#endif /* GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H */
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 6bd3ecda32..d982a3d2b7 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -266,8 +266,11 @@ class Server::SyncRequestThreadManager : public ThreadManager {
WorkStatus PollForWork(void** tag, bool* ok) override {
*tag = nullptr;
+ // TODO(ctiller): workaround for GPR_TIMESPAN based deadlines not working
+ // right now
gpr_timespec deadline =
- gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN);
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN));
switch (server_cq_->AsyncNext(tag, ok, deadline)) {
case CompletionQueue::TIMEOUT:
diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
index 087b685963..f59989655e 100644
--- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
@@ -36,7 +36,21 @@ namespace Grpc.Core
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
- internal AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+ /// <summary>
+ /// Creates a new AsyncClientStreamingCall object with the specified properties.
+ /// </summary>
+ /// <param name="requestStream">Stream of request values.</param>
+ /// <param name="responseAsync">The response of the asynchronous call.</param>
+ /// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
+ /// <param name="getStatusFunc">Delegate returning the status of the call.</param>
+ /// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
+ /// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
+ public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream,
+ Task<TResponse> responseAsync,
+ Task<Metadata> responseHeadersAsync,
+ Func<Status> getStatusFunc,
+ Func<Metadata> getTrailersFunc,
+ Action disposeAction)
{
this.requestStream = requestStream;
this.responseAsync = responseAsync;
diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
index ce49fb1596..1cb1a91859 100644
--- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
@@ -35,7 +35,21 @@ namespace Grpc.Core
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
- internal AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+ /// <summary>
+ /// Creates a new AsyncDuplexStreamingCall object with the specified properties.
+ /// </summary>
+ /// <param name="requestStream">Stream of request values.</param>
+ /// <param name="responseStream">Stream of response values.</param>
+ /// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
+ /// <param name="getStatusFunc">Delegate returning the status of the call.</param>
+ /// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
+ /// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
+ public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream,
+ IAsyncStreamReader<TResponse> responseStream,
+ Task<Metadata> responseHeadersAsync,
+ Func<Status> getStatusFunc,
+ Func<Metadata> getTrailersFunc,
+ Action disposeAction)
{
this.requestStream = requestStream;
this.responseStream = responseStream;
diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
index fbc97b8148..4303b0b1b0 100644
--- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
@@ -33,7 +33,19 @@ namespace Grpc.Core
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
- internal AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+ /// <summary>
+ /// Creates a new AsyncDuplexStreamingCall object with the specified properties.
+ /// </summary>
+ /// <param name="responseStream">Stream of response values.</param>
+ /// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
+ /// <param name="getStatusFunc">Delegate returning the status of the call.</param>
+ /// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
+ /// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
+ public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream,
+ Task<Metadata> responseHeadersAsync,
+ Func<Status> getStatusFunc,
+ Func<Metadata> getTrailersFunc,
+ Action disposeAction)
{
this.responseStream = responseStream;
this.responseHeadersAsync = responseHeadersAsync;
diff --git a/src/csharp/Grpc.Core/AsyncUnaryCall.cs b/src/csharp/Grpc.Core/AsyncUnaryCall.cs
index 6348f3c5fd..17747f86ca 100644
--- a/src/csharp/Grpc.Core/AsyncUnaryCall.cs
+++ b/src/csharp/Grpc.Core/AsyncUnaryCall.cs
@@ -34,7 +34,20 @@ namespace Grpc.Core
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
- internal AsyncUnaryCall(Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+
+ /// <summary>
+ /// Creates a new AsyncUnaryCall object with the specified properties.
+ /// </summary>
+ /// <param name="responseAsync">The response of the asynchronous call.</param>
+ /// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
+ /// <param name="getStatusFunc">Delegate returning the status of the call.</param>
+ /// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
+ /// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
+ public AsyncUnaryCall(Task<TResponse> responseAsync,
+ Task<Metadata> responseHeadersAsync,
+ Func<Status> getStatusFunc,
+ Func<Metadata> getTrailersFunc,
+ Action disposeAction)
{
this.responseAsync = responseAsync;
this.responseHeadersAsync = responseHeadersAsync;
diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m
index 82ac2600fa..5672bdad4c 100644
--- a/src/objective-c/tests/GRPCClientTests.m
+++ b/src/objective-c/tests/GRPCClientTests.m
@@ -18,6 +18,7 @@
#import <UIKit/UIKit.h>
#import <XCTest/XCTest.h>
+#import <grpc/grpc.h>
#import <GRPCClient/GRPCCall.h>
#import <GRPCClient/GRPCCall+ChannelArg.h>
@@ -30,6 +31,8 @@
#import <RxLibrary/GRXWriter+Immediate.h>
#import <RxLibrary/GRXBufferedPipe.h>
+#import "version.h"
+
#define TEST_TIMEOUT 16
static NSString * const kHostAddress = @"localhost:5050";
@@ -266,12 +269,38 @@ static GRPCProtoMethod *kFullDuplexCallMethod;
id<GRXWriteable> responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) {
XCTAssertNotNil(value, @"nil value received as response.");
XCTAssertEqual([value length], 0, @"Non-empty response received: %@", value);
- /* This test needs to be more clever in regards to changing the version of the core.
- XCTAssertEqualObjects(call.responseHeaders[@"x-grpc-test-echo-useragent"],
- @"Foo grpc-objc/0.13.0 grpc-c/0.14.0-dev (ios)",
- @"Did not receive expected user agent %@",
- call.responseHeaders[@"x-grpc-test-echo-useragent"]);
- */
+
+ NSString *userAgent = call.responseHeaders[@"x-grpc-test-echo-useragent"];
+ NSError *error = nil;
+
+ // Test the regex is correct
+ NSString *expectedUserAgent = @"Foo grpc-objc/";
+ expectedUserAgent =
+ [expectedUserAgent stringByAppendingString:GRPC_OBJC_VERSION_STRING];
+ expectedUserAgent =
+ [expectedUserAgent stringByAppendingString:@" grpc-c/"];
+ expectedUserAgent =
+ [expectedUserAgent stringByAppendingString:GRPC_C_VERSION_STRING];
+ expectedUserAgent =
+ [expectedUserAgent stringByAppendingString:@" (ios; chttp2; "];
+ expectedUserAgent =
+ [expectedUserAgent stringByAppendingString:[NSString stringWithUTF8String:grpc_g_stands_for()]];
+ expectedUserAgent = [expectedUserAgent stringByAppendingString:@")"];
+ XCTAssertEqualObjects(userAgent, expectedUserAgent);
+
+ // Change in format of user-agent field in a direction that does not match the regex will likely
+ // cause problem for certain gRPC users. For details, refer to internal doc https://goo.gl/c2diBc
+ NSRegularExpression *regex =
+ [NSRegularExpression regularExpressionWithPattern:@" grpc-[a-zA-Z0-9]+(-[a-zA-Z0-9]+)?/[^ ,]+( \\([^)]*\\))?"
+ options:0
+ error:&error];
+ NSString *customUserAgent =
+ [regex stringByReplacingMatchesInString:userAgent
+ options:0
+ range:NSMakeRange(0, [userAgent length])
+ withTemplate:@""];
+ XCTAssertEqualObjects(customUserAgent, @"Foo");
+
[response fulfill];
} completionHandler:^(NSError *errorOrNil) {
XCTAssertNil(errorOrNil, @"Finished with unexpected error: %@", errorOrNil);
diff --git a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj
index b01d5ffcea..2f0c8cfe18 100644
--- a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj
+++ b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj
@@ -144,6 +144,7 @@
5EAD6D241E27047400002378 /* CronetUnitTests.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = CronetUnitTests.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
5EAD6D261E27047400002378 /* CronetUnitTests.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = CronetUnitTests.m; sourceTree = "<group>"; };
5EAD6D281E27047400002378 /* Info.plist */ = {isa = PBXFileReference; lastKnownFileType = text.plist.xml; path = Info.plist; sourceTree = "<group>"; };
+ 5EAFE8271F8EFB87007F2189 /* version.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = version.h; sourceTree = "<group>"; };
5EE84BF11D4717E40050C6CC /* InteropTestsRemoteWithCronet.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = InteropTestsRemoteWithCronet.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
5EE84BF31D4717E40050C6CC /* InteropTestsRemoteWithCronet.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = InteropTestsRemoteWithCronet.m; sourceTree = "<group>"; };
5EE84BF51D4717E40050C6CC /* Info.plist */ = {isa = PBXFileReference; lastKnownFileType = text.plist.xml; path = Info.plist; sourceTree = "<group>"; };
@@ -398,6 +399,7 @@
635697C91B14FC11007A7283 /* Tests */ = {
isa = PBXGroup;
children = (
+ 5EAFE8271F8EFB87007F2189 /* version.h */,
6312AE4D1B1BF49B00341DEE /* GRPCClientTests.m */,
63E240CC1B6C4D3A005F3B0E /* InteropTests.h */,
635ED2EB1B1A3BC400FDE5C3 /* InteropTests.m */,
@@ -740,9 +742,12 @@
files = (
);
inputPaths = (
+ "${SRCROOT}/Pods/Target Support Files/Pods-CronetUnitTests/Pods-CronetUnitTests-resources.sh",
+ $PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle,
);
name = "[CP] Copy Pods Resources";
outputPaths = (
+ "${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@@ -755,9 +760,12 @@
files = (
);
inputPaths = (
+ "${SRCROOT}/Pods/Target Support Files/Pods-InteropTestsRemoteWithCronet/Pods-InteropTestsRemoteWithCronet-frameworks.sh",
+ "${PODS_ROOT}/CronetFramework/Cronet.framework",
);
name = "[CP] Embed Pods Frameworks";
outputPaths = (
+ "${TARGET_BUILD_DIR}/${FRAMEWORKS_FOLDER_PATH}/Cronet.framework",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@@ -770,13 +778,16 @@
files = (
);
inputPaths = (
+ "${PODS_PODFILE_DIR_PATH}/Podfile.lock",
+ "${PODS_ROOT}/Manifest.lock",
);
name = "[CP] Check Pods Manifest.lock";
outputPaths = (
+ "$(DERIVED_FILE_DIR)/Pods-InteropTestsRemote-checkManifestLockResult.txt",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
- shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n";
+ shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n";
showEnvVarsInLog = 0;
};
4F5690DC0E6AD6663FE78B8B /* [CP] Embed Pods Frameworks */ = {
@@ -800,13 +811,16 @@
files = (
);
inputPaths = (
+ "${PODS_PODFILE_DIR_PATH}/Podfile.lock",
+ "${PODS_ROOT}/Manifest.lock",
);
name = "[CP] Check Pods Manifest.lock";
outputPaths = (
+ "$(DERIVED_FILE_DIR)/Pods-InteropTestsLocalSSL-checkManifestLockResult.txt",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
- shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n";
+ shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n";
showEnvVarsInLog = 0;
};
5F14F59509E10C2852014F9E /* [CP] Embed Pods Frameworks */ = {
@@ -830,9 +844,12 @@
files = (
);
inputPaths = (
+ "${SRCROOT}/Pods/Target Support Files/Pods-InteropTestsLocalSSL/Pods-InteropTestsLocalSSL-resources.sh",
+ $PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle,
);
name = "[CP] Copy Pods Resources";
outputPaths = (
+ "${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@@ -845,9 +862,12 @@
files = (
);
inputPaths = (
+ "${SRCROOT}/Pods/Target Support Files/Pods-CoreCronetEnd2EndTests/Pods-CoreCronetEnd2EndTests-resources.sh",
+ $PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle,
);
name = "[CP] Copy Pods Resources";
outputPaths = (
+ "${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@@ -860,13 +880,16 @@
files = (
);
inputPaths = (
+ "${PODS_PODFILE_DIR_PATH}/Podfile.lock",
+ "${PODS_ROOT}/Manifest.lock",
);
name = "[CP] Check Pods Manifest.lock";
outputPaths = (
+ "$(DERIVED_FILE_DIR)/Pods-InteropTestsLocalCleartext-checkManifestLockResult.txt",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
- shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n";
+ shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n";
showEnvVarsInLog = 0;
};
796680C7599CB4ED736DD62A /* [CP] Check Pods Manifest.lock */ = {
@@ -875,13 +898,16 @@
files = (
);
inputPaths = (
+ "${PODS_PODFILE_DIR_PATH}/Podfile.lock",
+ "${PODS_ROOT}/Manifest.lock",
);
name = "[CP] Check Pods Manifest.lock";
outputPaths = (
+ "$(DERIVED_FILE_DIR)/Pods-Tests-checkManifestLockResult.txt",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
- shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n";
+ shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n";
showEnvVarsInLog = 0;
};
80E2DDD2EC04A4009F45E933 /* [CP] Check Pods Manifest.lock */ = {
@@ -890,13 +916,16 @@
files = (
);
inputPaths = (
+ "${PODS_PODFILE_DIR_PATH}/Podfile.lock",
+ "${PODS_ROOT}/Manifest.lock",
);
name = "[CP] Check Pods Manifest.lock";
outputPaths = (
+ "$(DERIVED_FILE_DIR)/Pods-CronetUnitTests-checkManifestLockResult.txt",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
- shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n";
+ shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n";
showEnvVarsInLog = 0;
};
8AD3130D3C58A0FB32FF2A36 /* [CP] Copy Pods Resources */ = {
@@ -905,9 +934,12 @@
files = (
);
inputPaths = (
+ "${SRCROOT}/Pods/Target Support Files/Pods-InteropTestsLocalCleartext/Pods-InteropTestsLocalCleartext-resources.sh",
+ $PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle,
);
name = "[CP] Copy Pods Resources";
outputPaths = (
+ "${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@@ -935,13 +967,16 @@
files = (
);
inputPaths = (
+ "${PODS_PODFILE_DIR_PATH}/Podfile.lock",
+ "${PODS_ROOT}/Manifest.lock",
);
name = "[CP] Check Pods Manifest.lock";
outputPaths = (
+ "$(DERIVED_FILE_DIR)/Pods-AllTests-checkManifestLockResult.txt",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
- shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n";
+ shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n";
showEnvVarsInLog = 0;
};
A441F71824DCB9D0CA297748 /* [CP] Copy Pods Resources */ = {
@@ -950,9 +985,12 @@
files = (
);
inputPaths = (
+ "${SRCROOT}/Pods/Target Support Files/Pods-AllTests/Pods-AllTests-resources.sh",
+ $PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle,
);
name = "[CP] Copy Pods Resources";
outputPaths = (
+ "${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@@ -965,9 +1003,12 @@
files = (
);
inputPaths = (
+ "${SRCROOT}/Pods/Target Support Files/Pods-CronetUnitTests/Pods-CronetUnitTests-frameworks.sh",
+ "${PODS_ROOT}/CronetFramework/Cronet.framework",
);
name = "[CP] Embed Pods Frameworks";
outputPaths = (
+ "${TARGET_BUILD_DIR}/${FRAMEWORKS_FOLDER_PATH}/Cronet.framework",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@@ -995,9 +1036,12 @@
files = (
);
inputPaths = (
+ "${SRCROOT}/Pods/Target Support Files/Pods-Tests/Pods-Tests-resources.sh",
+ $PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle,
);
name = "[CP] Copy Pods Resources";
outputPaths = (
+ "${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@@ -1010,13 +1054,16 @@
files = (
);
inputPaths = (
+ "${PODS_PODFILE_DIR_PATH}/Podfile.lock",
+ "${PODS_ROOT}/Manifest.lock",
);
name = "[CP] Check Pods Manifest.lock";
outputPaths = (
+ "$(DERIVED_FILE_DIR)/Pods-RxLibraryUnitTests-checkManifestLockResult.txt",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
- shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n";
+ shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n";
showEnvVarsInLog = 0;
};
C0F7B1FF6F88CC5FBF362F4C /* [CP] Check Pods Manifest.lock */ = {
@@ -1025,13 +1072,16 @@
files = (
);
inputPaths = (
+ "${PODS_PODFILE_DIR_PATH}/Podfile.lock",
+ "${PODS_ROOT}/Manifest.lock",
);
name = "[CP] Check Pods Manifest.lock";
outputPaths = (
+ "$(DERIVED_FILE_DIR)/Pods-InteropTestsRemoteWithCronet-checkManifestLockResult.txt",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
- shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n";
+ shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n";
showEnvVarsInLog = 0;
};
C2E09DC4BD239F71160F0CC1 /* [CP] Copy Pods Resources */ = {
@@ -1040,9 +1090,12 @@
files = (
);
inputPaths = (
+ "${SRCROOT}/Pods/Target Support Files/Pods-InteropTestsRemote/Pods-InteropTestsRemote-resources.sh",
+ $PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle,
);
name = "[CP] Copy Pods Resources";
outputPaths = (
+ "${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@@ -1070,9 +1123,12 @@
files = (
);
inputPaths = (
+ "${SRCROOT}/Pods/Target Support Files/Pods-RxLibraryUnitTests/Pods-RxLibraryUnitTests-resources.sh",
+ $PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle,
);
name = "[CP] Copy Pods Resources";
outputPaths = (
+ "${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@@ -1085,9 +1141,12 @@
files = (
);
inputPaths = (
+ "${SRCROOT}/Pods/Target Support Files/Pods-InteropTestsRemoteWithCronet/Pods-InteropTestsRemoteWithCronet-resources.sh",
+ $PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle,
);
name = "[CP] Copy Pods Resources";
outputPaths = (
+ "${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@@ -1100,9 +1159,12 @@
files = (
);
inputPaths = (
+ "${SRCROOT}/Pods/Target Support Files/Pods-CoreCronetEnd2EndTests/Pods-CoreCronetEnd2EndTests-frameworks.sh",
+ "${PODS_ROOT}/CronetFramework/Cronet.framework",
);
name = "[CP] Embed Pods Frameworks";
outputPaths = (
+ "${TARGET_BUILD_DIR}/${FRAMEWORKS_FOLDER_PATH}/Cronet.framework",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@@ -1115,13 +1177,16 @@
files = (
);
inputPaths = (
+ "${PODS_PODFILE_DIR_PATH}/Podfile.lock",
+ "${PODS_ROOT}/Manifest.lock",
);
name = "[CP] Check Pods Manifest.lock";
outputPaths = (
+ "$(DERIVED_FILE_DIR)/Pods-CoreCronetEnd2EndTests-checkManifestLockResult.txt",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
- shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n";
+ shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n";
showEnvVarsInLog = 0;
};
/* End PBXShellScriptBuildPhase section */
diff --git a/src/objective-c/tests/version.h b/src/objective-c/tests/version.h
new file mode 100644
index 0000000000..02515063fa
--- /dev/null
+++ b/src/objective-c/tests/version.h
@@ -0,0 +1,27 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// This file is autogenerated from a template file. Please make
+// modifications to
+// `templates/src/objective-c/GRPCClient/private/version.h.template`
+// instead. This file can be regenerated from the template by running
+// `tools/buildgen/generate_projects.sh`.
+
+
+#define GRPC_OBJC_VERSION_STRING @"1.8.0-dev"
+#define GRPC_C_VERSION_STRING @"5.0.0-dev"
diff --git a/src/ruby/ext/grpc/extconf.rb b/src/ruby/ext/grpc/extconf.rb
index e5ab02b9fc..9d2cf2a08a 100644
--- a/src/ruby/ext/grpc/extconf.rb
+++ b/src/ruby/ext/grpc/extconf.rb
@@ -41,6 +41,7 @@ LIB_DIRS = [
]
windows = RUBY_PLATFORM =~ /mingw|mswin/
+bsd = RUBY_PLATFORM =~ /bsd/
grpc_root = File.expand_path(File.join(File.dirname(__FILE__), '../../../..'))
@@ -70,7 +71,8 @@ unless windows
puts 'Building internal gRPC into ' + grpc_lib_dir
nproc = 4
nproc = Etc.nprocessors * 2 if Etc.respond_to? :nprocessors
- system("make -j#{nproc} -C #{grpc_root} #{grpc_lib_dir}/libgrpc.a CONFIG=#{grpc_config} Q=")
+ make = bsd ? 'gmake' : 'make'
+ system("#{make} -j#{nproc} -C #{grpc_root} #{grpc_lib_dir}/libgrpc.a CONFIG=#{grpc_config} Q=")
exit 1 unless $? == 0
end