aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/channel_connectivity.c2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c1
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c78
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c79
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h27
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c44
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c22
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h48
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c6
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c2
-rw-r--r--src/core/ext/transport/chttp2/transport/bin_decoder.c1
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c213
-rw-r--r--src/core/ext/transport/chttp2/transport/flow_control.c369
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_settings.c10
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_window_update.c12
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h281
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c84
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_lists.c19
-rw-r--r--src/core/ext/transport/chttp2/transport/varint.c4
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c72
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c3
-rw-r--r--src/core/lib/iomgr/combiner.c4
-rw-r--r--src/core/lib/iomgr/nameser.h104
-rw-r--r--src/core/lib/iomgr/port.h5
-rw-r--r--src/core/lib/json/json_reader.c1
-rw-r--r--src/core/lib/security/transport/security_handshaker.c2
-rw-r--r--src/core/lib/support/murmur_hash.c2
-rw-r--r--src/core/lib/surface/alarm.c3
-rw-r--r--src/core/lib/surface/call.c23
-rw-r--r--src/core/lib/surface/channel_ping.c2
-rw-r--r--src/core/lib/surface/completion_queue.c87
-rw-r--r--src/core/lib/surface/completion_queue.h5
-rw-r--r--src/core/lib/surface/server.c14
-rw-r--r--src/core/tsi/fake_transport_security.c2
-rw-r--r--src/core/tsi/transport_security.c2
-rw-r--r--src/core/tsi/transport_security.h3
-rw-r--r--src/core/tsi/transport_security_adapter.c2
-rw-r--r--src/core/tsi/transport_security_interface.h2
39 files changed, 1034 insertions, 608 deletions
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c
index c3dca14305..b83c95275f 100644
--- a/src/core/ext/filters/client_channel/channel_connectivity.c
+++ b/src/core/ext/filters/client_channel/channel_connectivity.c
@@ -208,7 +208,7 @@ void grpc_channel_watch_connectivity_state(
7, (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec,
(int)deadline.clock_type, cq, tag));
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
gpr_mu_init(&w->mu);
GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w,
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
index 52c6e38c87..568bb2ba8d 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
@@ -88,7 +88,6 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
// Record call finished, optionally setting client_failed_to_send and
// received.
grpc_grpclb_client_stats_add_call_finished(
- false /* drop_for_rate_limiting */, false /* drop_for_load_balancing */,
!calld->send_initial_metadata_succeeded /* client_failed_to_send */,
calld->recv_initial_metadata_succeeded /* known_received */,
calld->client_stats);
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index ebce801b37..bb9217d843 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -416,9 +416,7 @@ struct rr_connectivity_data {
static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
bool log) {
- if (server->drop_for_rate_limiting || server->drop_for_load_balancing) {
- return false;
- }
+ if (server->drop) return false;
const grpc_grpclb_ip_address *ip = &server->ip_address;
if (server->port >> 16 != 0) {
if (log) {
@@ -462,7 +460,7 @@ static const grpc_lb_user_data_vtable lb_token_vtable = {
static void parse_server(const grpc_grpclb_server *server,
grpc_resolved_address *addr) {
memset(addr, 0, sizeof(*addr));
- if (server->drop_for_rate_limiting || server->drop_for_load_balancing) return;
+ if (server->drop) return;
const uint16_t netorder_port = htons((uint16_t)server->port);
/* the addresses are given in binary format (a in(6)_addr struct) in
* server->ip_address.bytes. */
@@ -610,7 +608,7 @@ static bool pick_from_internal_rr_locked(
if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
glb_policy->serverlist_index = 0; // Wrap-around.
}
- if (server->drop_for_rate_limiting || server->drop_for_load_balancing) {
+ if (server->drop) {
// Not using the RR policy, so unref it.
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
@@ -622,11 +620,8 @@ static bool pick_from_internal_rr_locked(
// the client_load_reporting filter, because we do not create a
// subchannel call (and therefore no client_load_reporting filter)
// for dropped calls.
- grpc_grpclb_client_stats_add_call_started(wc_arg->client_stats);
- grpc_grpclb_client_stats_add_call_finished(
- server->drop_for_rate_limiting, server->drop_for_load_balancing,
- false /* failed_to_send */, false /* known_received */,
- wc_arg->client_stats);
+ grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token,
+ wc_arg->client_stats);
grpc_grpclb_client_stats_unref(wc_arg->client_stats);
if (force_async) {
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
@@ -715,7 +710,6 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
return;
}
glb_policy->rr_policy = new_rr_policy;
-
grpc_error *rr_state_error = NULL;
const grpc_connectivity_state rr_state =
grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy,
@@ -741,7 +735,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
rr_connectivity->state = rr_state;
/* Subscribe to changes to the connectivity of the new RR */
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_sched");
+ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb");
grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
&rr_connectivity->state,
&rr_connectivity->on_change);
@@ -806,32 +800,31 @@ static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
rr_connectivity_data *rr_connectivity = arg;
glb_lb_policy *glb_policy = rr_connectivity->glb_policy;
-
- const bool shutting_down = glb_policy->shutting_down;
- bool unref_needed = false;
- GRPC_ERROR_REF(error);
-
- if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN || shutting_down) {
- /* RR policy shutting down. Don't renew subscription and free the arg of
- * this callback. In addition we need to stash away the current policy to
- * be UNREF'd after releasing the lock. Otherwise, if the UNREF is the last
- * one, the policy would be destroyed, alongside the lock, which would
- * result in a use-after-free */
- unref_needed = true;
+ if (glb_policy->shutting_down) {
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+ "glb_rr_connectivity_cb");
gpr_free(rr_connectivity);
- } else { /* rr state != SHUTDOWN && !shutting down: biz as usual */
- update_lb_connectivity_status_locked(
- exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error));
- /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */
- grpc_lb_policy_notify_on_state_change_locked(
- exec_ctx, glb_policy->rr_policy, &rr_connectivity->state,
- &rr_connectivity->on_change);
+ return;
}
- if (unref_needed) {
+ if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN) {
+ /* An RR policy that has transitioned into the SHUTDOWN connectivity state
+ * should not be considered for picks or updates: the SHUTDOWN state is a
+ * sink, policies can't transition back from it. .*/
+ GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy,
+ "rr_connectivity_shutdown");
+ glb_policy->rr_policy = NULL;
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
- "rr_connectivity_cb");
+ "glb_rr_connectivity_cb");
+ gpr_free(rr_connectivity);
+ return;
}
- GRPC_ERROR_UNREF(error);
+ /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */
+ update_lb_connectivity_status_locked(
+ exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error));
+ /* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */
+ grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
+ &rr_connectivity->state,
+ &rr_connectivity->on_change);
}
static void destroy_balancer_name(grpc_exec_ctx *exec_ctx,
@@ -995,7 +988,6 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
gpr_free(glb_policy);
return NULL;
}
-
GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
glb_lb_channel_on_connectivity_changed_cb, glb_policy,
grpc_combiner_scheduler(args->combiner));
@@ -1052,7 +1044,7 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
glb_policy->pending_picks = NULL;
pending_ping *pping = glb_policy->pending_pings;
glb_policy->pending_pings = NULL;
- if (glb_policy->rr_policy) {
+ if (glb_policy->rr_policy != NULL) {
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
}
// We destroy the LB channel here because
@@ -1309,15 +1301,14 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx,
}
static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
+ grpc_grpclb_dropped_call_counts *drop_entries =
+ request->client_stats.calls_finished_with_drop.arg;
return request->client_stats.num_calls_started == 0 &&
request->client_stats.num_calls_finished == 0 &&
- request->client_stats.num_calls_finished_with_drop_for_rate_limiting ==
- 0 &&
- request->client_stats
- .num_calls_finished_with_drop_for_load_balancing == 0 &&
request->client_stats.num_calls_finished_with_client_failed_to_send ==
0 &&
- request->client_stats.num_calls_finished_known_received == 0;
+ request->client_stats.num_calls_finished_known_received == 0 &&
+ (drop_entries == NULL || drop_entries->num_entries == 0);
}
static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
@@ -1332,7 +1323,7 @@ static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
// Construct message payload.
GPR_ASSERT(glb_policy->client_load_report_payload == NULL);
grpc_grpclb_request *request =
- grpc_grpclb_load_report_request_create(glb_policy->client_stats);
+ grpc_grpclb_load_report_request_create_locked(glb_policy->client_stats);
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
if (load_report_counters_are_zero(request)) {
@@ -1778,7 +1769,8 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
if (!glb_policy->watching_lb_channel) {
// Watch the LB channel connectivity for connection.
- glb_policy->lb_channel_connectivity = GRPC_CHANNEL_INIT;
+ glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state(
+ glb_policy->lb_channel, true /* try to connect */);
grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(glb_policy->lb_channel));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
index c762443b7c..5b62623145 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
@@ -18,8 +18,11 @@
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
+#include <string.h>
+
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
@@ -29,10 +32,11 @@
struct grpc_grpclb_client_stats {
gpr_refcount refs;
+ // This field must only be accessed via *_locked() methods.
+ grpc_grpclb_dropped_call_counts* drop_token_counts;
+ // These fields may be accessed from multiple threads at a time.
gpr_atm num_calls_started;
gpr_atm num_calls_finished;
- gpr_atm num_calls_finished_with_drop_for_rate_limiting;
- gpr_atm num_calls_finished_with_drop_for_load_balancing;
gpr_atm num_calls_finished_with_client_failed_to_send;
gpr_atm num_calls_finished_known_received;
};
@@ -51,6 +55,7 @@ grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref(
void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats) {
if (gpr_unref(&client_stats->refs)) {
+ grpc_grpclb_dropped_call_counts_destroy(client_stats->drop_token_counts);
gpr_free(client_stats);
}
}
@@ -61,21 +66,9 @@ void grpc_grpclb_client_stats_add_call_started(
}
void grpc_grpclb_client_stats_add_call_finished(
- bool finished_with_drop_for_rate_limiting,
- bool finished_with_drop_for_load_balancing,
bool finished_with_client_failed_to_send, bool finished_known_received,
grpc_grpclb_client_stats* client_stats) {
gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1);
- if (finished_with_drop_for_rate_limiting) {
- gpr_atm_full_fetch_add(
- &client_stats->num_calls_finished_with_drop_for_rate_limiting,
- (gpr_atm)1);
- }
- if (finished_with_drop_for_load_balancing) {
- gpr_atm_full_fetch_add(
- &client_stats->num_calls_finished_with_drop_for_load_balancing,
- (gpr_atm)1);
- }
if (finished_with_client_failed_to_send) {
gpr_atm_full_fetch_add(
&client_stats->num_calls_finished_with_client_failed_to_send,
@@ -87,32 +80,70 @@ void grpc_grpclb_client_stats_add_call_finished(
}
}
+void grpc_grpclb_client_stats_add_call_dropped_locked(
+ char* token, grpc_grpclb_client_stats* client_stats) {
+ // Increment num_calls_started and num_calls_finished.
+ gpr_atm_full_fetch_add(&client_stats->num_calls_started, (gpr_atm)1);
+ gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1);
+ // Record the drop.
+ if (client_stats->drop_token_counts == NULL) {
+ client_stats->drop_token_counts =
+ gpr_zalloc(sizeof(grpc_grpclb_dropped_call_counts));
+ }
+ grpc_grpclb_dropped_call_counts* drop_token_counts =
+ client_stats->drop_token_counts;
+ for (size_t i = 0; i < drop_token_counts->num_entries; ++i) {
+ if (strcmp(drop_token_counts->token_counts[i].token, token) == 0) {
+ ++drop_token_counts->token_counts[i].count;
+ return;
+ }
+ }
+ // Not found, so add a new entry. We double the size of the array each time.
+ size_t new_num_entries = 2;
+ while (new_num_entries < drop_token_counts->num_entries + 1) {
+ new_num_entries *= 2;
+ }
+ drop_token_counts->token_counts =
+ gpr_realloc(drop_token_counts->token_counts,
+ new_num_entries * sizeof(grpc_grpclb_drop_token_count));
+ grpc_grpclb_drop_token_count* new_entry =
+ &drop_token_counts->token_counts[drop_token_counts->num_entries++];
+ new_entry->token = gpr_strdup(token);
+ new_entry->count = 1;
+}
+
static void atomic_get_and_reset_counter(int64_t* value, gpr_atm* counter) {
*value = (int64_t)gpr_atm_acq_load(counter);
gpr_atm_full_fetch_add(counter, (gpr_atm)(-*value));
}
-void grpc_grpclb_client_stats_get(
+void grpc_grpclb_client_stats_get_locked(
grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started,
int64_t* num_calls_finished,
- int64_t* num_calls_finished_with_drop_for_rate_limiting,
- int64_t* num_calls_finished_with_drop_for_load_balancing,
int64_t* num_calls_finished_with_client_failed_to_send,
- int64_t* num_calls_finished_known_received) {
+ int64_t* num_calls_finished_known_received,
+ grpc_grpclb_dropped_call_counts** drop_token_counts) {
atomic_get_and_reset_counter(num_calls_started,
&client_stats->num_calls_started);
atomic_get_and_reset_counter(num_calls_finished,
&client_stats->num_calls_finished);
atomic_get_and_reset_counter(
- num_calls_finished_with_drop_for_rate_limiting,
- &client_stats->num_calls_finished_with_drop_for_rate_limiting);
- atomic_get_and_reset_counter(
- num_calls_finished_with_drop_for_load_balancing,
- &client_stats->num_calls_finished_with_drop_for_load_balancing);
- atomic_get_and_reset_counter(
num_calls_finished_with_client_failed_to_send,
&client_stats->num_calls_finished_with_client_failed_to_send);
atomic_get_and_reset_counter(
num_calls_finished_known_received,
&client_stats->num_calls_finished_known_received);
+ *drop_token_counts = client_stats->drop_token_counts;
+ client_stats->drop_token_counts = NULL;
+}
+
+void grpc_grpclb_dropped_call_counts_destroy(
+ grpc_grpclb_dropped_call_counts* drop_entries) {
+ if (drop_entries != NULL) {
+ for (size_t i = 0; i < drop_entries->num_entries; ++i) {
+ gpr_free(drop_entries->token_counts[i].token);
+ }
+ gpr_free(drop_entries->token_counts);
+ gpr_free(drop_entries);
+ }
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
index 4bb47d5c5c..c51e2a431a 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
@@ -25,6 +25,16 @@
typedef struct grpc_grpclb_client_stats grpc_grpclb_client_stats;
+typedef struct {
+ char* token;
+ int64_t count;
+} grpc_grpclb_drop_token_count;
+
+typedef struct {
+ grpc_grpclb_drop_token_count* token_counts;
+ size_t num_entries;
+} grpc_grpclb_dropped_call_counts;
+
grpc_grpclb_client_stats* grpc_grpclb_client_stats_create();
grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref(
grpc_grpclb_client_stats* client_stats);
@@ -33,18 +43,23 @@ void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats);
void grpc_grpclb_client_stats_add_call_started(
grpc_grpclb_client_stats* client_stats);
void grpc_grpclb_client_stats_add_call_finished(
- bool finished_with_drop_for_rate_limiting,
- bool finished_with_drop_for_load_balancing,
bool finished_with_client_failed_to_send, bool finished_known_received,
grpc_grpclb_client_stats* client_stats);
-void grpc_grpclb_client_stats_get(
+// This method is not thread-safe; caller must synchronize.
+void grpc_grpclb_client_stats_add_call_dropped_locked(
+ char* token, grpc_grpclb_client_stats* client_stats);
+
+// This method is not thread-safe; caller must synchronize.
+void grpc_grpclb_client_stats_get_locked(
grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started,
int64_t* num_calls_finished,
- int64_t* num_calls_finished_with_drop_for_rate_limiting,
- int64_t* num_calls_finished_with_drop_for_load_balancing,
int64_t* num_calls_finished_with_client_failed_to_send,
- int64_t* num_calls_finished_known_received);
+ int64_t* num_calls_finished_known_received,
+ grpc_grpclb_dropped_call_counts** drop_token_counts);
+
+void grpc_grpclb_dropped_call_counts_destroy(
+ grpc_grpclb_dropped_call_counts* drop_entries);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H \
*/
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
index bec7c97a78..6fa29f326e 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
@@ -76,7 +76,33 @@ static void populate_timestamp(gpr_timespec timestamp,
timestamp_pb->nanos = timestamp.tv_nsec;
}
-grpc_grpclb_request *grpc_grpclb_load_report_request_create(
+static bool encode_string(pb_ostream_t *stream, const pb_field_t *field,
+ void *const *arg) {
+ char *str = *arg;
+ if (!pb_encode_tag_for_field(stream, field)) return false;
+ return pb_encode_string(stream, (uint8_t *)str, strlen(str));
+}
+
+static bool encode_drops(pb_ostream_t *stream, const pb_field_t *field,
+ void *const *arg) {
+ grpc_grpclb_dropped_call_counts *drop_entries = *arg;
+ if (drop_entries == NULL) return true;
+ for (size_t i = 0; i < drop_entries->num_entries; ++i) {
+ if (!pb_encode_tag_for_field(stream, field)) return false;
+ grpc_lb_v1_ClientStatsPerToken drop_message;
+ drop_message.load_balance_token.funcs.encode = encode_string;
+ drop_message.load_balance_token.arg = drop_entries->token_counts[i].token;
+ drop_message.has_num_calls = true;
+ drop_message.num_calls = drop_entries->token_counts[i].count;
+ if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields,
+ &drop_message)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+grpc_grpclb_request *grpc_grpclb_load_report_request_create_locked(
grpc_grpclb_client_stats *client_stats) {
grpc_grpclb_request *req = gpr_zalloc(sizeof(grpc_grpclb_request));
req->has_client_stats = true;
@@ -84,18 +110,17 @@ grpc_grpclb_request *grpc_grpclb_load_report_request_create(
populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp);
req->client_stats.has_num_calls_started = true;
req->client_stats.has_num_calls_finished = true;
- req->client_stats.has_num_calls_finished_with_drop_for_rate_limiting = true;
- req->client_stats.has_num_calls_finished_with_drop_for_load_balancing = true;
req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
req->client_stats.has_num_calls_finished_known_received = true;
- grpc_grpclb_client_stats_get(
+ req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops;
+ grpc_grpclb_client_stats_get_locked(
client_stats, &req->client_stats.num_calls_started,
&req->client_stats.num_calls_finished,
- &req->client_stats.num_calls_finished_with_drop_for_rate_limiting,
- &req->client_stats.num_calls_finished_with_drop_for_load_balancing,
&req->client_stats.num_calls_finished_with_client_failed_to_send,
- &req->client_stats.num_calls_finished_known_received);
+ &req->client_stats.num_calls_finished_known_received,
+ (grpc_grpclb_dropped_call_counts **)&req->client_stats
+ .calls_finished_with_drop.arg);
return req;
}
@@ -117,6 +142,11 @@ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) {
}
void grpc_grpclb_request_destroy(grpc_grpclb_request *request) {
+ if (request->has_client_stats) {
+ grpc_grpclb_dropped_call_counts *drop_entries =
+ request->client_stats.calls_finished_with_drop.arg;
+ grpc_grpclb_dropped_call_counts_destroy(drop_entries);
+ }
gpr_free(request);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
index ef8d563edc..c4a98492c9 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
@@ -44,7 +44,7 @@ typedef struct {
/** Create a request for a gRPC LB service under \a lb_service_name */
grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name);
-grpc_grpclb_request *grpc_grpclb_load_report_request_create(
+grpc_grpclb_request *grpc_grpclb_load_report_request_create_locked(
grpc_grpclb_client_stats *client_stats);
/** Protocol Buffers v3-encode \a request */
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
index fb119c7fc8..6a5d54c82a 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
@@ -33,14 +33,19 @@ const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2] = {
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v1_ClientStats_fields[8] = {
+const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3] = {
+ PB_FIELD( 1, STRING , OPTIONAL, CALLBACK, FIRST, grpc_lb_v1_ClientStatsPerToken, load_balance_token, load_balance_token, 0),
+ PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStatsPerToken, num_calls, load_balance_token, 0),
+ PB_LAST_FIELD
+};
+
+const pb_field_t grpc_lb_v1_ClientStats_fields[7] = {
PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v1_ClientStats, timestamp, timestamp, &grpc_lb_v1_Timestamp_fields),
PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_started, timestamp, 0),
PB_FIELD( 3, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished, num_calls_started, 0),
- PB_FIELD( 4, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_rate_limiting, num_calls_finished, 0),
- PB_FIELD( 5, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_load_balancing, num_calls_finished_with_drop_for_rate_limiting, 0),
- PB_FIELD( 6, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished_with_drop_for_load_balancing, 0),
+ PB_FIELD( 6, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished, 0),
PB_FIELD( 7, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_known_received, num_calls_finished_with_client_failed_to_send, 0),
+ PB_FIELD( 8, MESSAGE , REPEATED, CALLBACK, OTHER, grpc_lb_v1_ClientStats, calls_finished_with_drop, num_calls_finished_known_received, &grpc_lb_v1_ClientStatsPerToken_fields),
PB_LAST_FIELD
};
@@ -62,12 +67,11 @@ const pb_field_t grpc_lb_v1_ServerList_fields[3] = {
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v1_Server_fields[6] = {
+const pb_field_t grpc_lb_v1_Server_fields[5] = {
PB_FIELD( 1, BYTES , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, port, ip_address, 0),
PB_FIELD( 3, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0),
- PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_rate_limiting, load_balance_token, 0),
- PB_FIELD( 5, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_load_balancing, drop_for_rate_limiting, 0),
+ PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop, load_balance_token, 0),
PB_LAST_FIELD
};
@@ -81,7 +85,7 @@ const pb_field_t grpc_lb_v1_Server_fields[6] = {
* numbers or field sizes that are larger than what can fit in 8 or 16 bit
* field descriptors.
*/
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
#if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT)
@@ -92,7 +96,7 @@ PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request)
* numbers or field sizes that are larger than what can fit in the default
* 8 bit descriptors.
*/
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
index d3ae919ec2..93333d1aed 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
@@ -14,6 +14,13 @@ extern "C" {
#endif
/* Struct definitions */
+typedef struct _grpc_lb_v1_ClientStatsPerToken {
+ pb_callback_t load_balance_token;
+ bool has_num_calls;
+ int64_t num_calls;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStatsPerToken) */
+} grpc_lb_v1_ClientStatsPerToken;
+
typedef struct _grpc_lb_v1_Duration {
bool has_seconds;
int64_t seconds;
@@ -36,10 +43,8 @@ typedef struct _grpc_lb_v1_Server {
int32_t port;
bool has_load_balance_token;
char load_balance_token[50];
- bool has_drop_for_rate_limiting;
- bool drop_for_rate_limiting;
- bool has_drop_for_load_balancing;
- bool drop_for_load_balancing;
+ bool has_drop;
+ bool drop;
/* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */
} grpc_lb_v1_Server;
@@ -58,14 +63,11 @@ typedef struct _grpc_lb_v1_ClientStats {
int64_t num_calls_started;
bool has_num_calls_finished;
int64_t num_calls_finished;
- bool has_num_calls_finished_with_drop_for_rate_limiting;
- int64_t num_calls_finished_with_drop_for_rate_limiting;
- bool has_num_calls_finished_with_drop_for_load_balancing;
- int64_t num_calls_finished_with_drop_for_load_balancing;
bool has_num_calls_finished_with_client_failed_to_send;
int64_t num_calls_finished_with_client_failed_to_send;
bool has_num_calls_finished_known_received;
int64_t num_calls_finished_known_received;
+ pb_callback_t calls_finished_with_drop;
/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */
} grpc_lb_v1_ClientStats;
@@ -107,39 +109,41 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_Timestamp_init_default {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_default {false, grpc_lb_v1_InitialLoadBalanceRequest_init_default, false, grpc_lb_v1_ClientStats_init_default}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_default {false, ""}
-#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0}
+#define grpc_lb_v1_ClientStatsPerToken_init_default {{{NULL}, NULL}, false, 0}
+#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}}
#define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default}
#define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default}
-#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0, false, 0}
+#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0}
#define grpc_lb_v1_Duration_init_zero {false, 0, false, 0}
#define grpc_lb_v1_Timestamp_init_zero {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_zero {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""}
-#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0}
+#define grpc_lb_v1_ClientStatsPerToken_init_zero {{{NULL}, NULL}, false, 0}
+#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}}
#define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero}
#define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero}
-#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0, false, 0}
+#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0}
/* Field tags (for use in manual encoding/decoding) */
+#define grpc_lb_v1_ClientStatsPerToken_load_balance_token_tag 1
+#define grpc_lb_v1_ClientStatsPerToken_num_calls_tag 2
#define grpc_lb_v1_Duration_seconds_tag 1
#define grpc_lb_v1_Duration_nanos_tag 2
#define grpc_lb_v1_InitialLoadBalanceRequest_name_tag 1
#define grpc_lb_v1_Server_ip_address_tag 1
#define grpc_lb_v1_Server_port_tag 2
#define grpc_lb_v1_Server_load_balance_token_tag 3
-#define grpc_lb_v1_Server_drop_for_rate_limiting_tag 4
-#define grpc_lb_v1_Server_drop_for_load_balancing_tag 5
+#define grpc_lb_v1_Server_drop_tag 4
#define grpc_lb_v1_Timestamp_seconds_tag 1
#define grpc_lb_v1_Timestamp_nanos_tag 2
#define grpc_lb_v1_ClientStats_timestamp_tag 1
#define grpc_lb_v1_ClientStats_num_calls_started_tag 2
#define grpc_lb_v1_ClientStats_num_calls_finished_tag 3
-#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_rate_limiting_tag 4
-#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_load_balancing_tag 5
#define grpc_lb_v1_ClientStats_num_calls_finished_with_client_failed_to_send_tag 6
#define grpc_lb_v1_ClientStats_num_calls_finished_known_received_tag 7
+#define grpc_lb_v1_ClientStats_calls_finished_with_drop_tag 8
#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1
#define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 2
#define grpc_lb_v1_ServerList_servers_tag 1
@@ -154,22 +158,24 @@ extern const pb_field_t grpc_lb_v1_Duration_fields[3];
extern const pb_field_t grpc_lb_v1_Timestamp_fields[3];
extern const pb_field_t grpc_lb_v1_LoadBalanceRequest_fields[3];
extern const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2];
-extern const pb_field_t grpc_lb_v1_ClientStats_fields[8];
+extern const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3];
+extern const pb_field_t grpc_lb_v1_ClientStats_fields[7];
extern const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3];
extern const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3];
extern const pb_field_t grpc_lb_v1_ServerList_fields[3];
-extern const pb_field_t grpc_lb_v1_Server_fields[6];
+extern const pb_field_t grpc_lb_v1_Server_fields[5];
/* Maximum encoded size of messages (where known) */
#define grpc_lb_v1_Duration_size 22
#define grpc_lb_v1_Timestamp_size 22
-#define grpc_lb_v1_LoadBalanceRequest_size 226
+#define grpc_lb_v1_LoadBalanceRequest_size (140 + grpc_lb_v1_ClientStats_size)
#define grpc_lb_v1_InitialLoadBalanceRequest_size 131
-#define grpc_lb_v1_ClientStats_size 90
+/* grpc_lb_v1_ClientStatsPerToken_size depends on runtime parameters */
+/* grpc_lb_v1_ClientStats_size depends on runtime parameters */
#define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size)
#define grpc_lb_v1_InitialLoadBalanceResponse_size 90
/* grpc_lb_v1_ServerList_size depends on runtime parameters */
-#define grpc_lb_v1_Server_size 85
+#define grpc_lb_v1_Server_size 83
/* Message IDs (where set with "msgid" option) */
#ifdef PB_MSGID
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index bc40165cfb..a7f7e9542c 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -74,6 +74,9 @@ typedef struct round_robin_lb_policy {
bool started_picking;
/** are we shutting down? */
bool shutdown;
+ /** has the policy gotten into the GRPC_CHANNEL_SHUTDOWN? No picks can be
+ * service after this point, the policy will never transition out. */
+ bool in_connectivity_shutdown;
/** List of picks that are waiting on connectivity */
pending_pick *pending_picks;
@@ -420,6 +423,8 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_call_context_element *context, void **user_data,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
+ GPR_ASSERT(!p->shutdown);
+ GPR_ASSERT(!p->in_connectivity_shutdown);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Trying to pick", (void *)pol);
}
@@ -532,6 +537,7 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
"rr_shutdown");
+ p->in_connectivity_shutdown = true;
new_state = GRPC_CHANNEL_SHUTDOWN;
} else if (subchannel_list->num_transient_failures ==
p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
index 9065e33613..6ec3790a5f 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
@@ -33,13 +33,13 @@
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
-#include <nameser.h>
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/nameser.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/support/string.h"
diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.c b/src/core/ext/transport/chttp2/transport/bin_decoder.c
index fe7b7e4a42..5a99cbeffc 100644
--- a/src/core/ext/transport/chttp2/transport/bin_decoder.c
+++ b/src/core/ext/transport/chttp2/transport/bin_decoder.c
@@ -118,6 +118,7 @@ bool grpc_base64_decode_partial(struct grpc_base64_decode_context *ctx) {
switch (input_tail) {
case 3:
ctx->output_cur[1] = COMPOSE_OUTPUT_BYTE_1(ctx->input_cur);
+ /* fallthrough */
case 2:
ctx->output_cur[0] = COMPOSE_OUTPUT_BYTE_0(ctx->input_cur);
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index ede05d57b7..221224111e 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -114,11 +114,6 @@ static void connectivity_state_set(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state state,
grpc_error *error, const char *reason);
-static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- size_t max_size_hint,
- size_t have_already);
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream,
grpc_error *error_ignored);
@@ -270,8 +265,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->endpoint_reading = 1;
t->next_stream_id = is_client ? 1 : 2;
t->is_client = is_client;
- t->outgoing_window = DEFAULT_WINDOW;
- t->incoming_window = DEFAULT_WINDOW;
+ t->flow_control.remote_window = DEFAULT_WINDOW;
+ t->flow_control.announced_window = DEFAULT_WINDOW;
+ t->flow_control.t = t;
t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->is_first_frame = true;
grpc_connectivity_state_init(
@@ -710,6 +706,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
post_destructive_reclaimer(exec_ctx, t);
}
+ s->flow_control.s = s;
GPR_TIMER_END("init_stream", 0);
return 0;
@@ -766,13 +763,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GRPC_ERROR_UNREF(s->write_closed_error);
GRPC_ERROR_UNREF(s->byte_stream_error);
- if (s->incoming_window_delta > 0) {
- GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA(
- "destroy", t, s, s->incoming_window_delta);
- } else if (s->incoming_window_delta < 0) {
- GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA(
- "destroy", t, s, -s->incoming_window_delta);
- }
+ grpc_chttp2_flowctl_destroy_stream(&t->flow_control, &s->flow_control);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream");
@@ -1485,9 +1476,16 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
s->recv_message_ready = op_payload->recv_message.recv_message_ready;
s->recv_message = op_payload->recv_message.recv_message;
if (s->id != 0) {
- already_received = s->frame_storage.length;
- incoming_byte_stream_update_flow_control(
- exec_ctx, t, s, GRPC_HEADER_SIZE_IN_BYTES, already_received);
+ if (!s->read_closed) {
+ already_received = s->frame_storage.length;
+ grpc_chttp2_flowctl_incoming_bs_update(
+ &t->flow_control, &s->flow_control, GRPC_HEADER_SIZE_IN_BYTES,
+ already_received);
+ grpc_chttp2_act_on_flowctl_action(
+ exec_ctx,
+ grpc_chttp2_flowctl_get_action(&t->flow_control, &s->flow_control),
+ t, s);
+ }
}
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
@@ -2237,6 +2235,37 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
* INPUT PROCESSING - PARSING
*/
+void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_flowctl_action action,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s) {
+ switch (action.send_stream_update) {
+ case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
+ break;
+ case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
+ grpc_chttp2_become_writable(exec_ctx, t, s,
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
+ "immediate stream flowctl");
+ break;
+ case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
+ grpc_chttp2_become_writable(exec_ctx, t, s,
+ GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK,
+ "queue stream flowctl");
+ break;
+ }
+ switch (action.send_transport_update) {
+ case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
+ break;
+ case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
+ grpc_chttp2_initiate_write(exec_ctx, t, "immediate transport flowctl");
+ break;
+ // this is the same as no action b/c every time the transport enters the
+ // writing path it will maybe do an update
+ case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
+ break;
+ }
+}
+
static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
double bdp_dbl) {
// initial window size bounded [1,2^31-1], but we set the min to 128.
@@ -2248,9 +2277,10 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (delta == 0 || (delta > -bdp / 10 && delta < bdp / 10)) {
return;
}
- if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
- gpr_log(GPR_DEBUG, "%s: update initial window size to %d", t->peer_string,
- (int)bdp);
+ if (GRPC_TRACER_ON(grpc_bdp_estimator_trace) ||
+ GRPC_TRACER_ON(grpc_flowctl_trace)) {
+ gpr_log(GPR_DEBUG, "%s | %p[%s] | update initial window size to %d",
+ t->peer_string, t, t->is_client ? "cli" : "svr", (int)bdp);
}
queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
(uint32_t)bdp);
@@ -2350,8 +2380,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END("reading_action.parse", 0);
GPR_TIMER_BEGIN("post_parse_locked", 0);
- if (t->initial_window_update != 0) {
- if (t->initial_window_update > 0) {
+ if (t->flow_control.initial_window_update != 0) {
+ if (t->flow_control.initial_window_update > 0) {
grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
grpc_chttp2_become_writable(
@@ -2359,7 +2389,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
"unstalled");
}
}
- t->initial_window_update = 0;
+ t->flow_control.initial_window_update = 0;
}
GPR_TIMER_END("post_parse_locked", 0);
}
@@ -2633,54 +2663,6 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
}
}
-static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- size_t max_size_hint,
- size_t have_already) {
- uint32_t max_recv_bytes;
- uint32_t initial_window_size =
- t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
-
- /* clamp max recv hint to an allowable size */
- if (max_size_hint >= UINT32_MAX - initial_window_size) {
- max_recv_bytes = UINT32_MAX - initial_window_size;
- } else {
- max_recv_bytes = (uint32_t)max_size_hint;
- }
-
- /* account for bytes already received but unknown to higher layers */
- if (max_recv_bytes >= have_already) {
- max_recv_bytes -= (uint32_t)have_already;
- } else {
- max_recv_bytes = 0;
- }
-
- /* add some small lookahead to keep pipelines flowing */
- GPR_ASSERT(max_recv_bytes <= UINT32_MAX - initial_window_size);
- if (s->incoming_window_delta < max_recv_bytes && !s->read_closed) {
- uint32_t add_max_recv_bytes =
- (uint32_t)(max_recv_bytes - s->incoming_window_delta);
- grpc_chttp2_stream_write_type write_type =
- GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED;
- if (s->incoming_window_delta + initial_window_size <
- (int64_t)have_already) {
- write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED;
- }
- GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA("op", t, s,
- add_max_recv_bytes);
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window,
- add_max_recv_bytes);
- if ((int64_t)s->incoming_window_delta + (int64_t)initial_window_size -
- (int64_t)s->announce_window >
- (int64_t)initial_window_size / 2) {
- write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK;
- }
- grpc_chttp2_become_writable(exec_ctx, t, s, write_type,
- "read_incoming_stream");
- }
-}
-
static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
void *argp,
grpc_error *error_ignored) {
@@ -2689,9 +2671,15 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s = bs->stream;
size_t cur_length = s->frame_storage.length;
- incoming_byte_stream_update_flow_control(
- exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
-
+ if (!s->read_closed) {
+ grpc_chttp2_flowctl_incoming_bs_update(&t->flow_control, &s->flow_control,
+ bs->next_action.max_size_hint,
+ cur_length);
+ grpc_chttp2_act_on_flowctl_action(
+ exec_ctx,
+ grpc_chttp2_flowctl_get_action(&t->flow_control, &s->flow_control), t,
+ s);
+ }
GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
if (s->frame_storage.length > 0) {
grpc_slice_buffer_swap(&s->frame_storage,
@@ -3001,83 +2989,6 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
/*******************************************************************************
- * TRACING
- */
-
-static char *format_flowctl_context_var(const char *context, const char *var,
- int64_t val, uint32_t id) {
- char *name;
- if (context == NULL) {
- name = gpr_strdup(var);
- } else if (0 == strcmp(context, "t")) {
- GPR_ASSERT(id == 0);
- gpr_asprintf(&name, "TRANSPORT:%s", var);
- } else if (0 == strcmp(context, "s")) {
- GPR_ASSERT(id != 0);
- gpr_asprintf(&name, "STREAM[%d]:%s", id, var);
- } else {
- gpr_asprintf(&name, "BAD_CONTEXT[%s][%d]:%s", context, id, var);
- }
- char *name_fld = gpr_leftpad(name, ' ', 64);
- char *value;
- gpr_asprintf(&value, "%" PRId64, val);
- char *value_fld = gpr_leftpad(value, ' ', 8);
- char *result;
- gpr_asprintf(&result, "%s %s", name_fld, value_fld);
- gpr_free(name);
- gpr_free(name_fld);
- gpr_free(value);
- gpr_free(value_fld);
- return result;
-}
-
-void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
- grpc_chttp2_flowctl_op op, const char *context1,
- const char *var1, const char *context2,
- const char *var2, int is_client,
- uint32_t stream_id, int64_t val1, int64_t val2) {
- char *tmp_phase;
- char *label1 = format_flowctl_context_var(context1, var1, val1, stream_id);
- char *label2 = format_flowctl_context_var(context2, var2, val2, stream_id);
- char *clisvr = is_client ? "client" : "server";
- char *prefix;
-
- tmp_phase = gpr_leftpad(phase, ' ', 8);
- gpr_asprintf(&prefix, "FLOW %s: %s ", tmp_phase, clisvr);
- gpr_free(tmp_phase);
-
- switch (op) {
- case GRPC_CHTTP2_FLOWCTL_MOVE:
- if (val2 != 0) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "%sMOVE %s <- %s giving %" PRId64, prefix, label1, label2,
- val1 + val2);
- }
- break;
- case GRPC_CHTTP2_FLOWCTL_CREDIT:
- GPR_ASSERT(val2 >= 0);
- if (val2 != 0) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "%sCREDIT %s by %s giving %" PRId64, prefix, label1, label2,
- val1 + val2);
- }
- break;
- case GRPC_CHTTP2_FLOWCTL_DEBIT:
- GPR_ASSERT(val2 >= 0);
- if (val2 != 0) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "%sDEBIT %s by %s giving %" PRId64, prefix, label1, label2,
- val1 - val2);
- }
- break;
- }
-
- gpr_free(label1);
- gpr_free(label2);
- gpr_free(prefix);
-}
-
-/*******************************************************************************
* INTEGRATION GLUE
*/
diff --git a/src/core/ext/transport/chttp2/transport/flow_control.c b/src/core/ext/transport/chttp2/transport/flow_control.c
new file mode 100644
index 0000000000..c9f7eabd43
--- /dev/null
+++ b/src/core/ext/transport/chttp2/transport/flow_control.c
@@ -0,0 +1,369 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/ext/transport/chttp2/transport/internal.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/support/string.h"
+
+static uint32_t grpc_chttp2_target_announced_window(
+ const grpc_chttp2_transport_flowctl* tfc);
+
+#ifndef NDEBUG
+
+typedef struct {
+ int64_t remote_window;
+ int64_t target_window;
+ int64_t announced_window;
+ int64_t remote_window_delta;
+ int64_t local_window_delta;
+ int64_t announced_window_delta;
+} shadow_flow_control;
+
+static void pretrace(shadow_flow_control* shadow_fc,
+ grpc_chttp2_transport_flowctl* tfc,
+ grpc_chttp2_stream_flowctl* sfc) {
+ shadow_fc->remote_window = tfc->remote_window;
+ shadow_fc->target_window = grpc_chttp2_target_announced_window(tfc);
+ shadow_fc->announced_window = tfc->announced_window;
+ if (sfc != NULL) {
+ shadow_fc->remote_window_delta = sfc->remote_window_delta;
+ shadow_fc->local_window_delta = sfc->local_window_delta;
+ shadow_fc->announced_window_delta = sfc->announced_window_delta;
+ }
+}
+
+static char* fmt_str(int64_t old, int64_t new) {
+ char* str;
+ if (old != new) {
+ gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old, new);
+ } else {
+ gpr_asprintf(&str, "%" PRId64 "", old);
+ }
+ char* str_lp = gpr_leftpad(str, ' ', 30);
+ gpr_free(str);
+ return str_lp;
+}
+
+static void posttrace(shadow_flow_control* shadow_fc,
+ grpc_chttp2_transport_flowctl* tfc,
+ grpc_chttp2_stream_flowctl* sfc, char* reason) {
+ uint32_t acked_local_window =
+ tfc->t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ uint32_t remote_window =
+ tfc->t->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ char* trw_str = fmt_str(shadow_fc->remote_window, tfc->remote_window);
+ char* tlw_str = fmt_str(shadow_fc->target_window,
+ grpc_chttp2_target_announced_window(tfc));
+ char* taw_str = fmt_str(shadow_fc->announced_window, tfc->announced_window);
+ char* srw_str;
+ char* slw_str;
+ char* saw_str;
+ if (sfc != NULL) {
+ srw_str = fmt_str(shadow_fc->remote_window_delta + remote_window,
+ sfc->remote_window_delta + remote_window);
+ slw_str = fmt_str(shadow_fc->local_window_delta + acked_local_window,
+ sfc->local_window_delta + acked_local_window);
+ saw_str = fmt_str(shadow_fc->announced_window_delta + acked_local_window,
+ sfc->announced_window_delta + acked_local_window);
+ } else {
+ srw_str = gpr_leftpad("", ' ', 30);
+ slw_str = gpr_leftpad("", ' ', 30);
+ saw_str = gpr_leftpad("", ' ', 30);
+ }
+ gpr_log(GPR_DEBUG,
+ "%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s",
+ tfc, sfc != NULL ? sfc->s->id : 0, tfc->t->is_client ? "cli" : "svr",
+ reason, trw_str, tlw_str, taw_str, srw_str, slw_str, saw_str);
+ gpr_free(trw_str);
+ gpr_free(tlw_str);
+ gpr_free(taw_str);
+ gpr_free(srw_str);
+ gpr_free(slw_str);
+ gpr_free(saw_str);
+}
+
+static char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) {
+ switch (urgency) {
+ case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
+ return "no action";
+ case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
+ return "update immediately";
+ case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
+ return "queue update";
+ default:
+ GPR_UNREACHABLE_CODE(return "unknown");
+ }
+ GPR_UNREACHABLE_CODE(return "unknown");
+}
+
+static void trace_action(grpc_chttp2_flowctl_action action) {
+ gpr_log(GPR_DEBUG, "transport: %s, stream: %s",
+ urgency_to_string(action.send_transport_update),
+ urgency_to_string(action.send_stream_update));
+}
+
+#define PRETRACE(tfc, sfc) \
+ shadow_flow_control shadow_fc; \
+ GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&shadow_fc, tfc, sfc))
+#define POSTTRACE(tfc, sfc, reason) \
+ GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&shadow_fc, tfc, sfc, reason))
+#define TRACEACTION(action) GRPC_FLOW_CONTROL_IF_TRACING(trace_action(action))
+#else
+#define PRETRACE(tfc, sfc)
+#define POSTTRACE(tfc, sfc, reason)
+#define TRACEACTION(action)
+#endif
+
+/* How many bytes of incoming flow control would we like to advertise */
+static uint32_t grpc_chttp2_target_announced_window(
+ const grpc_chttp2_transport_flowctl* tfc) {
+ return (uint32_t)GPR_MIN(
+ (int64_t)((1u << 31) - 1),
+ tfc->announced_stream_total_over_incoming_window +
+ tfc->t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
+}
+
+// we have sent data on the wire, we must track this in our bookkeeping for the
+// remote peer's flow control.
+void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl* tfc,
+ grpc_chttp2_stream_flowctl* sfc,
+ int64_t size) {
+ PRETRACE(tfc, sfc);
+ tfc->remote_window -= size;
+ sfc->remote_window_delta -= size;
+ POSTTRACE(tfc, sfc, " data sent");
+}
+
+static void announced_window_delta_preupdate(grpc_chttp2_transport_flowctl* tfc,
+ grpc_chttp2_stream_flowctl* sfc) {
+ if (sfc->announced_window_delta > 0) {
+ tfc->announced_stream_total_over_incoming_window -=
+ sfc->announced_window_delta;
+ } else {
+ tfc->announced_stream_total_under_incoming_window +=
+ -sfc->announced_window_delta;
+ }
+}
+
+static void announced_window_delta_postupdate(
+ grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) {
+ if (sfc->announced_window_delta > 0) {
+ tfc->announced_stream_total_over_incoming_window +=
+ sfc->announced_window_delta;
+ } else {
+ tfc->announced_stream_total_under_incoming_window -=
+ -sfc->announced_window_delta;
+ }
+}
+
+// We have received data from the wire. We must track this in our own flow
+// control bookkeeping.
+// Returns an error if the incoming frame violates our flow control.
+grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl* tfc,
+ grpc_chttp2_stream_flowctl* sfc,
+ int64_t incoming_frame_size) {
+ uint32_t sent_init_window =
+ tfc->t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ uint32_t acked_init_window =
+ tfc->t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ PRETRACE(tfc, sfc);
+ if (incoming_frame_size > tfc->announced_window) {
+ char* msg;
+ gpr_asprintf(&msg,
+ "frame of size %" PRId64 " overflows local window of %" PRId64,
+ incoming_frame_size, tfc->announced_window);
+ grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+ gpr_free(msg);
+ return err;
+ }
+
+ if (sfc != NULL) {
+ int64_t acked_stream_window =
+ sfc->announced_window_delta + acked_init_window;
+ int64_t sent_stream_window = sfc->announced_window_delta + sent_init_window;
+ if (incoming_frame_size > acked_stream_window) {
+ if (incoming_frame_size <= sent_stream_window) {
+ gpr_log(
+ GPR_ERROR,
+ "Incoming frame of size %" PRId64
+ " exceeds local window size of %" PRId64
+ ".\n"
+ "The (un-acked, future) window size would be %" PRId64
+ " which is not exceeded.\n"
+ "This would usually cause a disconnection, but allowing it due to"
+ "broken HTTP2 implementations in the wild.\n"
+ "See (for example) https://github.com/netty/netty/issues/6520.",
+ incoming_frame_size, acked_stream_window, sent_stream_window);
+ } else {
+ char* msg;
+ gpr_asprintf(&msg, "frame of size %" PRId64
+ " overflows local window of %" PRId64,
+ incoming_frame_size, acked_stream_window);
+ grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+ gpr_free(msg);
+ return err;
+ }
+ }
+
+ announced_window_delta_preupdate(tfc, sfc);
+ sfc->announced_window_delta -= incoming_frame_size;
+ announced_window_delta_postupdate(tfc, sfc);
+ sfc->local_window_delta -= incoming_frame_size;
+ }
+
+ tfc->announced_window -= incoming_frame_size;
+
+ POSTTRACE(tfc, sfc, " data recv");
+ return GRPC_ERROR_NONE;
+}
+
+// Returns a non zero announce integer if we should send a transport window
+// update
+uint32_t grpc_chttp2_flowctl_maybe_send_transport_update(
+ grpc_chttp2_transport_flowctl* tfc) {
+ PRETRACE(tfc, NULL);
+ uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc);
+ uint32_t threshold_to_send_transport_window_update =
+ tfc->t->outbuf.count > 0 ? 3 * target_announced_window / 4
+ : target_announced_window / 2;
+ if (tfc->announced_window <= threshold_to_send_transport_window_update &&
+ tfc->announced_window != target_announced_window) {
+ uint32_t announce = (uint32_t)GPR_CLAMP(
+ target_announced_window - tfc->announced_window, 0, UINT32_MAX);
+ tfc->announced_window += announce;
+ POSTTRACE(tfc, NULL, "t updt sent");
+ return announce;
+ }
+ GRPC_FLOW_CONTROL_IF_TRACING(
+ gpr_log(GPR_DEBUG, "%p[0][%s] will not send transport update", tfc,
+ tfc->t->is_client ? "cli" : "svr"));
+ return 0;
+}
+
+// Returns a non zero announce integer if we should send a stream window update
+uint32_t grpc_chttp2_flowctl_maybe_send_stream_update(
+ grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) {
+ PRETRACE(tfc, sfc);
+ if (sfc->local_window_delta > sfc->announced_window_delta) {
+ uint32_t announce = (uint32_t)GPR_CLAMP(
+ sfc->local_window_delta - sfc->announced_window_delta, 0, UINT32_MAX);
+ announced_window_delta_preupdate(tfc, sfc);
+ sfc->announced_window_delta += announce;
+ announced_window_delta_postupdate(tfc, sfc);
+ POSTTRACE(tfc, sfc, "s updt sent");
+ return announce;
+ }
+ GRPC_FLOW_CONTROL_IF_TRACING(
+ gpr_log(GPR_DEBUG, "%p[%u][%s] will not send stream update", tfc,
+ sfc->s->id, tfc->t->is_client ? "cli" : "svr"));
+ return 0;
+}
+
+// we have received a WINDOW_UPDATE frame for a transport
+void grpc_chttp2_flowctl_recv_transport_update(
+ grpc_chttp2_transport_flowctl* tfc, uint32_t size) {
+ PRETRACE(tfc, NULL);
+ tfc->remote_window += size;
+ POSTTRACE(tfc, NULL, "t updt recv");
+}
+
+// we have received a WINDOW_UPDATE frame for a stream
+void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl* tfc,
+ grpc_chttp2_stream_flowctl* sfc,
+ uint32_t size) {
+ PRETRACE(tfc, sfc);
+ sfc->remote_window_delta += size;
+ POSTTRACE(tfc, sfc, "s updt recv");
+}
+
+void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc,
+ grpc_chttp2_stream_flowctl* sfc,
+ size_t max_size_hint,
+ size_t have_already) {
+ PRETRACE(tfc, sfc);
+ uint32_t max_recv_bytes;
+ uint32_t sent_init_window =
+ tfc->t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+
+ /* clamp max recv hint to an allowable size */
+ if (max_size_hint >= UINT32_MAX - sent_init_window) {
+ max_recv_bytes = UINT32_MAX - sent_init_window;
+ } else {
+ max_recv_bytes = (uint32_t)max_size_hint;
+ }
+
+ /* account for bytes already received but unknown to higher layers */
+ if (max_recv_bytes >= have_already) {
+ max_recv_bytes -= (uint32_t)have_already;
+ } else {
+ max_recv_bytes = 0;
+ }
+
+ /* add some small lookahead to keep pipelines flowing */
+ GPR_ASSERT(max_recv_bytes <= UINT32_MAX - sent_init_window);
+ if (sfc->local_window_delta < max_recv_bytes) {
+ uint32_t add_max_recv_bytes =
+ (uint32_t)(max_recv_bytes - sfc->local_window_delta);
+ sfc->local_window_delta += add_max_recv_bytes;
+ }
+ POSTTRACE(tfc, sfc, "app st recv");
+}
+
+void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl* tfc,
+ grpc_chttp2_stream_flowctl* sfc) {
+ announced_window_delta_preupdate(tfc, sfc);
+}
+
+grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
+ const grpc_chttp2_transport_flowctl* tfc,
+ const grpc_chttp2_stream_flowctl* sfc) {
+ grpc_chttp2_flowctl_action action;
+ memset(&action, 0, sizeof(action));
+ uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc);
+ if (tfc->announced_window < target_announced_window / 2) {
+ action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
+ }
+ if (sfc != NULL && !sfc->s->read_closed) {
+ uint32_t sent_init_window =
+ tfc->t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ if ((int64_t)sfc->local_window_delta >
+ (int64_t)sfc->announced_window_delta &&
+ (int64_t)sfc->announced_window_delta + sent_init_window <=
+ sent_init_window / 2) {
+ action.send_stream_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
+ } else if (sfc->local_window_delta > sfc->announced_window_delta) {
+ action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE;
+ }
+ }
+ TRACEACTION(action);
+ return action;
+}
diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c
index e3e432a94a..057d3d9ed3 100644
--- a/src/core/ext/transport/chttp2/transport/frame_settings.c
+++ b/src/core/ext/transport/chttp2/transport/frame_settings.c
@@ -201,11 +201,13 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p,
}
if (id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE &&
parser->incoming_settings[id] != parser->value) {
- t->initial_window_update +=
+ t->flow_control.initial_window_update +=
(int64_t)parser->value - parser->incoming_settings[id];
- if (GRPC_TRACER_ON(grpc_http_trace)) {
- gpr_log(GPR_DEBUG, "adding %d for initial_window change",
- (int)t->initial_window_update);
+ if (GRPC_TRACER_ON(grpc_http_trace) ||
+ GRPC_TRACER_ON(grpc_flowctl_trace)) {
+ gpr_log(GPR_DEBUG, "%p[%s] adding %d for initial_window change",
+ t, t->is_client ? "cli" : "svr",
+ (int)t->flow_control.initial_window_update);
}
}
parser->incoming_settings[id] = parser->value;
diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c
index 682be2c89b..65f3b01d77 100644
--- a/src/core/ext/transport/chttp2/transport/frame_window_update.c
+++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c
@@ -95,8 +95,8 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
if (t->incoming_stream_id != 0) {
if (s != NULL) {
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", t, s, outgoing_window_delta,
- received_update);
+ grpc_chttp2_flowctl_recv_stream_update(
+ &t->flow_control, &s->flow_control, received_update);
if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) {
grpc_chttp2_become_writable(
exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED,
@@ -104,10 +104,10 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
}
}
} else {
- bool was_zero = t->outgoing_window <= 0;
- GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parse", t, outgoing_window,
- received_update);
- bool is_zero = t->outgoing_window <= 0;
+ bool was_zero = t->flow_control.remote_window <= 0;
+ grpc_chttp2_flowctl_recv_transport_update(&t->flow_control,
+ received_update);
+ bool is_zero = t->flow_control.remote_window <= 0;
if (was_zero && !is_zero) {
grpc_chttp2_initiate_write(exec_ctx, t, "new_global_flow_control");
}
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index b538d1df17..f26f14dbec 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -213,6 +213,35 @@ typedef enum {
GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
} grpc_chttp2_keepalive_state;
+typedef struct {
+ /** initial window change. This is tracked as we parse settings frames from
+ * the remote peer. If there is a positive delta, then we will make all
+ * streams readable since they may have become unstalled */
+ int64_t initial_window_update;
+
+ /** Our bookkeeping for the remote peer's available window */
+ int64_t remote_window;
+
+ /** calculating what we should give for local window:
+ we track the total amount of flow control over initial window size
+ across all streams: this is data that we want to receive right now (it
+ has an outstanding read)
+ and the total amount of flow control under initial window size across all
+ streams: this is data we've read early
+ we want to adjust incoming_window such that:
+ incoming_window = total_over - max(bdp - total_under, 0) */
+ int64_t announced_stream_total_over_incoming_window;
+ int64_t announced_stream_total_under_incoming_window;
+
+ /** This is out window according to what we have sent to our remote peer. The
+ * difference between this and target window is what we use to decide when
+ * to send WINDOW_UPDATE frames. */
+ int64_t announced_window;
+
+ // read only pointer back to transport for certain data
+ const grpc_chttp2_transport *t;
+} grpc_chttp2_transport_flowctl;
+
struct grpc_chttp2_transport {
grpc_transport base; /* must be first */
gpr_refcount refs;
@@ -271,7 +300,6 @@ struct grpc_chttp2_transport {
grpc_slice_buffer outbuf;
/** hpack encoding */
grpc_chttp2_hpack_compressor hpack_compressor;
- int64_t outgoing_window;
/** is this a client? */
uint8_t is_client;
@@ -328,21 +356,14 @@ struct grpc_chttp2_transport {
/** parser for goaway frames */
grpc_chttp2_goaway_parser goaway_parser;
- /** initial window change */
- int64_t initial_window_update;
+ grpc_chttp2_transport_flowctl flow_control;
- /** window available for peer to send to us */
- int64_t incoming_window;
- /** calculating what we should give for incoming window:
- we track the total amount of flow control over initial window size
- across all streams: this is data that we want to receive right now (it
- has an outstanding read)
- and the total amount of flow control under initial window size across all
- streams: this is data we've read early
- we want to adjust incoming_window such that:
- incoming_window = total_over - max(bdp - total_under, 0) */
- int64_t stream_total_over_incoming_window;
- int64_t stream_total_under_incoming_window;
+ /* bdp estimation */
+ grpc_bdp_estimator bdp_estimator;
+
+ /* pid controller */
+ grpc_pid_controller pid_controller;
+ gpr_timespec last_pid_update;
/* deframing */
grpc_chttp2_deframe_transport_state deframe_state;
@@ -369,11 +390,8 @@ struct grpc_chttp2_transport {
grpc_chttp2_write_cb *write_cb_pool;
/* bdp estimator */
- grpc_bdp_estimator bdp_estimator;
- grpc_pid_controller pid_controller;
grpc_closure start_bdp_ping_locked;
grpc_closure finish_bdp_ping_locked;
- gpr_timespec last_pid_update;
/* if non-NULL, close the transport with this error when writes are finished
*/
@@ -422,6 +440,25 @@ typedef enum {
GPRC_METADATA_PUBLISHED_AT_CLOSE
} grpc_published_metadata_method;
+typedef struct {
+ /** window available for us to send to peer, over or under the initial window
+ * size of the transport... ie:
+ * remote_window = remote_window_delta + transport.initial_window_size */
+ int64_t remote_window_delta;
+
+ /** window available for peer to send to us (as a delta on
+ * transport.initial_window_size)
+ * local_window = local_window_delta + transport.initial_window_size */
+ int64_t local_window_delta;
+
+ /** window available for peer to send to us over this stream that we have
+ * announced to the peer */
+ int64_t announced_window_delta;
+
+ // read only pointer back to stream for data
+ const grpc_chttp2_stream *s;
+} grpc_chttp2_stream_flowctl;
+
struct grpc_chttp2_stream {
grpc_chttp2_transport *t;
grpc_stream_refcount *refcount;
@@ -435,10 +472,6 @@ struct grpc_chttp2_stream {
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
uint32_t id;
- /** window available for us to send to peer, over or under the initial window
- * size of the transport... ie:
- * outgoing_window = outgoing_window_delta + transport.initial_window_size */
- int64_t outgoing_window_delta;
/** things the upper layers would like to send */
grpc_metadata_batch *send_initial_metadata;
grpc_closure *send_initial_metadata_finished;
@@ -505,10 +538,6 @@ struct grpc_chttp2_stream {
grpc_error *forced_close_error;
/** how many header frames have we received? */
uint8_t header_frames_received;
- /** window available for peer to send to us (as a delta on
- * transport.initial_window_size)
- * incoming_window = incoming_window_delta + transport.initial_window_size */
- int64_t incoming_window_delta;
/** parsing state for data frames */
/* Accessed only by transport thread when stream->pending_byte_stream == false
* Accessed only by application thread when stream->pending_byte_stream ==
@@ -519,8 +548,9 @@ struct grpc_chttp2_stream {
bool sent_initial_metadata;
bool sent_trailing_metadata;
- /** how much window should we announce? */
- uint32_t announce_window;
+
+ grpc_chttp2_stream_flowctl flow_control;
+
grpc_slice_buffer flow_controlled_buffer;
grpc_chttp2_write_cb *on_write_finished_cbs;
@@ -621,6 +651,75 @@ bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport *t,
bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
+/********* Flow Control ***************/
+
+// we have sent data on the wire
+void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl *tfc,
+ grpc_chttp2_stream_flowctl *sfc,
+ int64_t size);
+
+// we have received data from the wire
+grpc_error *grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl *tfc,
+ grpc_chttp2_stream_flowctl *sfc,
+ int64_t incoming_frame_size);
+
+// returns an announce if we should send a transport update to our peer,
+// else returns zero
+uint32_t grpc_chttp2_flowctl_maybe_send_transport_update(
+ grpc_chttp2_transport_flowctl *tfc);
+
+// returns an announce if we should send a stream update to our peer, else
+// returns zero
+uint32_t grpc_chttp2_flowctl_maybe_send_stream_update(
+ grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc);
+
+// we have received a WINDOW_UPDATE frame for a transport
+void grpc_chttp2_flowctl_recv_transport_update(
+ grpc_chttp2_transport_flowctl *tfc, uint32_t size);
+
+// we have received a WINDOW_UPDATE frame for a stream
+void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl *tfc,
+ grpc_chttp2_stream_flowctl *sfc,
+ uint32_t size);
+
+// the application is asking for a certain amount of bytes
+void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl *tfc,
+ grpc_chttp2_stream_flowctl *sfc,
+ size_t max_size_hint,
+ size_t have_already);
+
+void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl *tfc,
+ grpc_chttp2_stream_flowctl *sfc);
+
+typedef enum {
+ // Nothing to be done.
+ GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED = 0,
+ // Initiate a write to update the initial window immediately.
+ GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY,
+ // Push the flow control update into a send buffer, to be sent
+ // out the next time a write is initiated.
+ GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE,
+} grpc_chttp2_flowctl_urgency;
+
+typedef struct {
+ grpc_chttp2_flowctl_urgency send_stream_update;
+ grpc_chttp2_flowctl_urgency send_transport_update;
+} grpc_chttp2_flowctl_action;
+
+// Reads the flow control data and returns and actionable struct that will tell
+// chttp2 exactly what it needs to do
+grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
+ const grpc_chttp2_transport_flowctl *tfc,
+ const grpc_chttp2_stream_flowctl *sfc);
+
+// Takes in a flow control action and performs all the needed operations.
+void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_flowctl_action action,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s);
+
+/********* End of Flow Control ***************/
+
grpc_chttp2_stream *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport *t,
uint32_t id);
grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx,
@@ -651,126 +750,22 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
extern grpc_tracer_flag grpc_http_trace;
extern grpc_tracer_flag grpc_flowctl_trace;
+#ifndef NDEBUG
+#define GRPC_FLOW_CONTROL_IF_TRACING(stmt) \
+ if (!(GRPC_TRACER_ON(grpc_flowctl_trace))) \
+ ; \
+ else \
+ stmt
+#else
+#define GRPC_FLOW_CONTROL_IF_TRACING(stmt)
+#endif
+
#define GRPC_CHTTP2_IF_TRACING(stmt) \
if (!(GRPC_TRACER_ON(grpc_http_trace))) \
; \
else \
stmt
-typedef enum {
- GRPC_CHTTP2_FLOWCTL_MOVE,
- GRPC_CHTTP2_FLOWCTL_CREDIT,
- GRPC_CHTTP2_FLOWCTL_DEBIT
-} grpc_chttp2_flowctl_op;
-
-#define GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, transport, id1, id2, dst_context, \
- dst_var, src_context, src_var) \
- do { \
- assert(id1 == id2); \
- if (GRPC_TRACER_ON(grpc_flowctl_trace)) { \
- grpc_chttp2_flowctl_trace( \
- __FILE__, __LINE__, phase, GRPC_CHTTP2_FLOWCTL_MOVE, #dst_context, \
- #dst_var, #src_context, #src_var, transport->is_client, id1, \
- dst_context->dst_var, src_context->src_var); \
- } \
- dst_context->dst_var += src_context->src_var; \
- src_context->src_var = 0; \
- } while (0)
-
-#define GRPC_CHTTP2_FLOW_MOVE_STREAM(phase, transport, dst_context, dst_var, \
- src_context, src_var) \
- GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, transport, dst_context->id, \
- src_context->id, dst_context, dst_var, \
- src_context, src_var)
-#define GRPC_CHTTP2_FLOW_MOVE_TRANSPORT(phase, dst_context, dst_var, \
- src_context, src_var) \
- GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, dst_context, 0, 0, dst_context, dst_var, \
- src_context, src_var)
-
-#define GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, transport, id, dst_context, \
- dst_var, amount) \
- do { \
- if (GRPC_TRACER_ON(grpc_flowctl_trace)) { \
- grpc_chttp2_flowctl_trace(__FILE__, __LINE__, phase, \
- GRPC_CHTTP2_FLOWCTL_CREDIT, #dst_context, \
- #dst_var, NULL, #amount, transport->is_client, \
- id, dst_context->dst_var, amount); \
- } \
- dst_context->dst_var += amount; \
- } while (0)
-
-#define GRPC_CHTTP2_FLOW_CREDIT_STREAM(phase, transport, dst_context, dst_var, \
- amount) \
- GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, transport, dst_context->id, \
- dst_context, dst_var, amount)
-#define GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT(phase, dst_context, dst_var, amount) \
- GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \
- amount)
-
-#define GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE( \
- phase, transport, dst_context) \
- if (dst_context->incoming_window_delta < 0) { \
- transport->stream_total_under_incoming_window += \
- dst_context->incoming_window_delta; \
- } else if (dst_context->incoming_window_delta > 0) { \
- transport->stream_total_over_incoming_window -= \
- dst_context->incoming_window_delta; \
- }
-
-#define GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE( \
- phase, transport, dst_context) \
- if (dst_context->incoming_window_delta < 0) { \
- transport->stream_total_under_incoming_window -= \
- dst_context->incoming_window_delta; \
- } else if (dst_context->incoming_window_delta > 0) { \
- transport->stream_total_over_incoming_window += \
- dst_context->incoming_window_delta; \
- }
-
-#define GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA( \
- phase, transport, dst_context, amount) \
- GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE(phase, transport, \
- dst_context); \
- GRPC_CHTTP2_FLOW_DEBIT_STREAM(phase, transport, dst_context, \
- incoming_window_delta, amount); \
- GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE(phase, transport, \
- dst_context);
-
-#define GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA( \
- phase, transport, dst_context, amount) \
- GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE(phase, transport, \
- dst_context); \
- GRPC_CHTTP2_FLOW_CREDIT_STREAM(phase, transport, dst_context, \
- incoming_window_delta, amount); \
- GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE(phase, transport, \
- dst_context);
-
-#define GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, id, dst_context, \
- dst_var, amount) \
- do { \
- if (GRPC_TRACER_ON(grpc_flowctl_trace)) { \
- grpc_chttp2_flowctl_trace(__FILE__, __LINE__, phase, \
- GRPC_CHTTP2_FLOWCTL_DEBIT, #dst_context, \
- #dst_var, NULL, #amount, transport->is_client, \
- id, dst_context->dst_var, amount); \
- } \
- dst_context->dst_var -= amount; \
- } while (0)
-
-#define GRPC_CHTTP2_FLOW_DEBIT_STREAM(phase, transport, dst_context, dst_var, \
- amount) \
- GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, dst_context->id, \
- dst_context, dst_var, amount)
-#define GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT(phase, dst_context, dst_var, amount) \
- GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \
- amount)
-
-void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
- grpc_chttp2_flowctl_op op, const char *context1,
- const char *var1, const char *context2,
- const char *var2, int is_client,
- uint32_t stream_id, int64_t val1, int64_t val2);
-
void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *stream, grpc_error *error);
void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
@@ -872,8 +867,6 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error);
-uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t);
-
/** Set the default keepalive configurations, must only be called at
initialization */
void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args,
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 9d46cfa22e..18d163ee98 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -349,93 +349,25 @@ void grpc_chttp2_parsing_become_skip_parser(grpc_exec_ctx *exec_ctx,
t->parser == grpc_chttp2_header_parser_parse);
}
-static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s) {
- uint32_t incoming_frame_size = t->incoming_frame_size;
- if (incoming_frame_size > t->incoming_window) {
- char *msg;
- gpr_asprintf(&msg, "frame of size %d overflows incoming window of %" PRId64,
- t->incoming_frame_size, t->incoming_window);
- grpc_error *err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
- gpr_free(msg);
- return err;
- }
-
- if (s != NULL) {
- if (incoming_frame_size >
- s->incoming_window_delta +
- t->settings[GRPC_ACKED_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) {
- if (incoming_frame_size <=
- s->incoming_window_delta +
- t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) {
- gpr_log(
- GPR_ERROR,
- "Incoming frame of size %d exceeds incoming window size of %" PRId64
- ".\n"
- "The (un-acked, future) window size would be %" PRId64
- " which is not exceeded.\n"
- "This would usually cause a disconnection, but allowing it due to "
- "broken HTTP2 implementations in the wild.\n"
- "See (for example) https://github.com/netty/netty/issues/6520.",
- t->incoming_frame_size,
- s->incoming_window_delta +
- t->settings[GRPC_ACKED_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
- s->incoming_window_delta +
- t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
- } else {
- char *msg;
- gpr_asprintf(&msg,
- "frame of size %d overflows incoming window of %" PRId64,
- t->incoming_frame_size,
- s->incoming_window_delta +
- t->settings[GRPC_ACKED_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
- grpc_error *err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
- gpr_free(msg);
- return err;
- }
- }
-
- GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA("parse", t, s,
- incoming_frame_size);
- if ((int64_t)s->incoming_window_delta - (int64_t)s->announce_window <=
- -(int64_t)t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] /
- 2) {
- grpc_chttp2_become_writable(exec_ctx, t, s,
- GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED,
- "window-update-required");
- }
- s->received_bytes += incoming_frame_size;
- }
-
- uint32_t target_incoming_window = grpc_chttp2_target_incoming_window(t);
- GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window,
- incoming_frame_size);
- if (t->incoming_window <= target_incoming_window / 2) {
- grpc_chttp2_initiate_write(exec_ctx, t, "flow_control");
- }
-
- return GRPC_ERROR_NONE;
-}
-
static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_chttp2_stream *s =
grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
grpc_error *err = GRPC_ERROR_NONE;
- err = update_incoming_window(exec_ctx, t, s);
+ err = grpc_chttp2_flowctl_recv_data(&t->flow_control,
+ s == NULL ? NULL : &s->flow_control,
+ t->incoming_frame_size);
+ grpc_chttp2_act_on_flowctl_action(
+ exec_ctx, grpc_chttp2_flowctl_get_action(
+ &t->flow_control, s == NULL ? NULL : &s->flow_control),
+ t, s);
if (err != GRPC_ERROR_NONE) {
goto error_handler;
}
if (s == NULL) {
return init_skip_frame_parser(exec_ctx, t, 0);
}
+ s->received_bytes += t->incoming_frame_size;
s->stats.incoming.framing_bytes += 9;
if (err == GRPC_ERROR_NONE && s->read_closed) {
return init_skip_frame_parser(exec_ctx, t, 0);
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c
index 1bf5b34510..7cc85dea9c 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.c
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.c
@@ -150,12 +150,17 @@ void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport *t,
void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
+ GRPC_FLOW_CONTROL_IF_TRACING(
+ gpr_log(GPR_DEBUG, "stream %u stalled by transport", s->id));
stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream **s) {
- return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
+ bool ret = stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
+ GRPC_FLOW_CONTROL_IF_TRACING(if (ret) gpr_log(
+ GPR_DEBUG, "stream %u un-stalled by transport", (*s)->id));
+ return ret;
}
void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t,
@@ -165,15 +170,23 @@ void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t,
void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
+ GRPC_FLOW_CONTROL_IF_TRACING(
+ gpr_log(GPR_DEBUG, "stream %u stalled by stream", s->id));
stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
}
bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream **s) {
- return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
+ bool ret = stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
+ GRPC_FLOW_CONTROL_IF_TRACING(
+ if (ret) gpr_log(GPR_DEBUG, "stream %u un-stalled by stream", (*s)->id));
+ return ret;
}
bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
- return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
+ bool ret = stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
+ GRPC_FLOW_CONTROL_IF_TRACING(
+ if (ret) gpr_log(GPR_DEBUG, "stream %u un-stalled by stream", s->id));
+ return ret;
}
diff --git a/src/core/ext/transport/chttp2/transport/varint.c b/src/core/ext/transport/chttp2/transport/varint.c
index 5f93a23a94..0d94ddcbc3 100644
--- a/src/core/ext/transport/chttp2/transport/varint.c
+++ b/src/core/ext/transport/chttp2/transport/varint.c
@@ -37,12 +37,16 @@ void grpc_chttp2_hpack_write_varint_tail(uint32_t tail_value, uint8_t* target,
switch (tail_length) {
case 5:
target[4] = (uint8_t)((tail_value >> 28) | 0x80);
+ /* fallthrough */
case 4:
target[3] = (uint8_t)((tail_value >> 21) | 0x80);
+ /* fallthrough */
case 3:
target[2] = (uint8_t)((tail_value >> 14) | 0x80);
+ /* fallthrough */
case 2:
target[1] = (uint8_t)((tail_value >> 7) | 0x80);
+ /* fallthrough */
case 1:
target[0] = (uint8_t)((tail_value) | 0x80);
}
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index c3ede08343..80eb51ff0d 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -148,15 +148,6 @@ static bool stream_ref_if_not_destroyed(gpr_refcount *r) {
return true;
}
-/* How many bytes of incoming flow control would we like to advertise */
-uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t) {
- return (uint32_t)GPR_MIN(
- (int64_t)((1u << 31) - 1),
- t->stream_total_over_incoming_window +
- t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
-}
-
/* How many bytes would we like to put on the wire during a single syscall */
static uint32_t target_write_size(grpc_chttp2_transport *t) {
return 1024 * 1024;
@@ -201,7 +192,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
&t->hpack_compressor,
t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
- if (t->outgoing_window > 0) {
+ if (t->flow_control.remote_window > 0) {
while (grpc_chttp2_list_pop_stalled_by_transport(t, &s)) {
if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s) &&
stream_ref_if_not_destroyed(&s->refcount->refs)) {
@@ -227,10 +218,12 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
bool sent_initial_metadata = s->sent_initial_metadata;
bool now_writing = false;
- GRPC_CHTTP2_IF_TRACING(gpr_log(
- GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t,
- t->is_client ? "CLIENT" : "SERVER", s->id, sent_initial_metadata,
- s->send_initial_metadata != NULL, s->announce_window));
+ GRPC_CHTTP2_IF_TRACING(
+ gpr_log(GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t,
+ t->is_client ? "CLIENT" : "SERVER", s->id,
+ sent_initial_metadata, s->send_initial_metadata != NULL,
+ (int)(s->flow_control.local_window_delta -
+ s->flow_control.announced_window_delta)));
grpc_mdelem *extra_headers_for_trailing_metadata[2];
size_t num_extra_headers_for_trailing_metadata = 0;
@@ -287,11 +280,12 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
sent_initial_metadata = true;
}
/* send any window updates */
- if (s->announce_window > 0) {
- uint32_t announce = s->announce_window;
- grpc_slice_buffer_add(&t->outbuf,
- grpc_chttp2_window_update_create(
- s->id, s->announce_window, &s->stats.outgoing));
+ uint32_t stream_announce = grpc_chttp2_flowctl_maybe_send_stream_update(
+ &t->flow_control, &s->flow_control);
+ if (stream_announce > 0) {
+ grpc_slice_buffer_add(
+ &t->outbuf, grpc_chttp2_window_update_create(s->id, stream_announce,
+ &s->stats.outgoing));
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {
@@ -299,22 +293,21 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
gpr_inf_past(GPR_CLOCK_MONOTONIC);
t->ping_recv_state.ping_strikes = 0;
}
- GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, announce_window, announce);
}
if (sent_initial_metadata) {
/* send any body bytes, if allowed by flow control */
if (s->flow_controlled_buffer.length > 0 ||
(s->stream_compression_send_enabled &&
s->compressed_data_buffer->length > 0)) {
- uint32_t stream_outgoing_window = (uint32_t)GPR_MAX(
+ uint32_t stream_remote_window = (uint32_t)GPR_MAX(
0,
- s->outgoing_window_delta +
+ s->flow_control.remote_window_delta +
(int64_t)t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
uint32_t max_outgoing = (uint32_t)GPR_MIN(
t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
- GPR_MIN(stream_outgoing_window, t->outgoing_window));
+ GPR_MIN(stream_remote_window, t->flow_control.remote_window));
if (max_outgoing > 0) {
bool is_last_data_frame = false;
bool is_last_frame = false;
@@ -335,10 +328,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
grpc_chttp2_encode_data(s->id, s->compressed_data_buffer,
send_bytes, is_last_frame,
&s->stats.outgoing, &t->outbuf);
- GRPC_CHTTP2_FLOW_DEBIT_STREAM(
- "write", t, s, outgoing_window_delta, send_bytes);
- GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
- send_bytes);
+ grpc_chttp2_flowctl_sent_data(&t->flow_control,
+ &s->flow_control, send_bytes);
max_outgoing -= send_bytes;
if (s->compressed_data_buffer->length == 0) {
s->sending_bytes += s->uncompressed_data_size;
@@ -367,10 +358,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer,
send_bytes, is_last_frame,
&s->stats.outgoing, &t->outbuf);
- GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta,
+ grpc_chttp2_flowctl_sent_data(&t->flow_control, &s->flow_control,
send_bytes);
- GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
- send_bytes);
s->sending_bytes += send_bytes;
}
t->ping_state.pings_before_data_required =
@@ -396,10 +385,10 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork");
grpc_chttp2_list_add_writable_stream(t, s);
}
- } else if (t->outgoing_window == 0) {
+ } else if (t->flow_control.remote_window == 0) {
grpc_chttp2_list_add_stalled_by_transport(t, s);
now_writing = true;
- } else if (stream_outgoing_window == 0) {
+ } else if (stream_remote_window == 0) {
grpc_chttp2_list_add_stalled_by_stream(t, s);
now_writing = true;
}
@@ -453,22 +442,15 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
}
}
- /* if the grpc_chttp2_transport is ready to send a window update, do so here
- also; 3/4 is a magic number that will likely get tuned soon */
- uint32_t target_incoming_window = grpc_chttp2_target_incoming_window(t);
- uint32_t threshold_to_send_transport_window_update =
- t->outbuf.count > 0 ? 3 * target_incoming_window / 4
- : target_incoming_window / 2;
- if (t->incoming_window <= threshold_to_send_transport_window_update &&
- t->incoming_window != target_incoming_window) {
+ uint32_t transport_announce =
+ grpc_chttp2_flowctl_maybe_send_transport_update(&t->flow_control);
+ if (transport_announce) {
maybe_initiate_ping(exec_ctx, t,
GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE);
- uint32_t announced = (uint32_t)GPR_CLAMP(
- target_incoming_window - t->incoming_window, 0, UINT32_MAX);
- GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("write", t, incoming_window, announced);
grpc_transport_one_way_stats throwaway_stats;
- grpc_slice_buffer_add(&t->outbuf, grpc_chttp2_window_update_create(
- 0, announced, &throwaway_stats));
+ grpc_slice_buffer_add(
+ &t->outbuf, grpc_chttp2_window_update_create(0, transport_announce,
+ &throwaway_stats));
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 29dfa885de..776e0765fe 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -968,6 +968,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
s->header_array.capacity = s->header_array.count;
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url);
bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false);
+ if (url) {
+ gpr_free(url);
+ }
unsigned int header_index;
for (header_index = 0; header_index < s->header_array.count;
header_index++) {
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index c72c37e2b5..9b66987b68 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -73,10 +73,8 @@ static const grpc_closure_scheduler_vtable finally_scheduler = {
static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
grpc_combiner *grpc_combiner_create(void) {
- grpc_combiner *lock = gpr_malloc(sizeof(*lock));
+ grpc_combiner *lock = gpr_zalloc(sizeof(*lock));
gpr_ref_init(&lock->refs, 1);
- lock->next_combiner_on_this_exec_ctx = NULL;
- lock->time_to_execute_final_list = false;
lock->scheduler.vtable = &scheduler;
lock->finally_scheduler.vtable = &finally_scheduler;
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
diff --git a/src/core/lib/iomgr/nameser.h b/src/core/lib/iomgr/nameser.h
new file mode 100644
index 0000000000..daed6de518
--- /dev/null
+++ b/src/core/lib/iomgr/nameser.h
@@ -0,0 +1,104 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_NAMESER_H
+#define GRPC_CORE_LIB_IOMGR_NAMESER_H
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_HAVE_ARPA_NAMESER
+
+#include <arpa/nameser.h>
+
+#else /* GRPC_HAVE_ARPA_NAMESER */
+
+typedef enum __ns_class {
+ ns_c_invalid = 0, /* Cookie. */
+ ns_c_in = 1, /* Internet. */
+ ns_c_2 = 2, /* unallocated/unsupported. */
+ ns_c_chaos = 3, /* MIT Chaos-net. */
+ ns_c_hs = 4, /* MIT Hesiod. */
+ /* Query class values which do not appear in resource records */
+ ns_c_none = 254, /* for prereq. sections in update requests */
+ ns_c_any = 255, /* Wildcard match. */
+ ns_c_max = 65536
+} ns_class;
+
+typedef enum __ns_type {
+ ns_t_invalid = 0, /* Cookie. */
+ ns_t_a = 1, /* Host address. */
+ ns_t_ns = 2, /* Authoritative server. */
+ ns_t_md = 3, /* Mail destination. */
+ ns_t_mf = 4, /* Mail forwarder. */
+ ns_t_cname = 5, /* Canonical name. */
+ ns_t_soa = 6, /* Start of authority zone. */
+ ns_t_mb = 7, /* Mailbox domain name. */
+ ns_t_mg = 8, /* Mail group member. */
+ ns_t_mr = 9, /* Mail rename name. */
+ ns_t_null = 10, /* Null resource record. */
+ ns_t_wks = 11, /* Well known service. */
+ ns_t_ptr = 12, /* Domain name pointer. */
+ ns_t_hinfo = 13, /* Host information. */
+ ns_t_minfo = 14, /* Mailbox information. */
+ ns_t_mx = 15, /* Mail routing information. */
+ ns_t_txt = 16, /* Text strings. */
+ ns_t_rp = 17, /* Responsible person. */
+ ns_t_afsdb = 18, /* AFS cell database. */
+ ns_t_x25 = 19, /* X_25 calling address. */
+ ns_t_isdn = 20, /* ISDN calling address. */
+ ns_t_rt = 21, /* Router. */
+ ns_t_nsap = 22, /* NSAP address. */
+ ns_t_nsap_ptr = 23, /* Reverse NSAP lookup (deprecated). */
+ ns_t_sig = 24, /* Security signature. */
+ ns_t_key = 25, /* Security key. */
+ ns_t_px = 26, /* X.400 mail mapping. */
+ ns_t_gpos = 27, /* Geographical position (withdrawn). */
+ ns_t_aaaa = 28, /* Ip6 Address. */
+ ns_t_loc = 29, /* Location Information. */
+ ns_t_nxt = 30, /* Next domain (security). */
+ ns_t_eid = 31, /* Endpoint identifier. */
+ ns_t_nimloc = 32, /* Nimrod Locator. */
+ ns_t_srv = 33, /* Server Selection. */
+ ns_t_atma = 34, /* ATM Address */
+ ns_t_naptr = 35, /* Naming Authority PoinTeR */
+ ns_t_kx = 36, /* Key Exchange */
+ ns_t_cert = 37, /* Certification record */
+ ns_t_a6 = 38, /* IPv6 address (deprecates AAAA) */
+ ns_t_dname = 39, /* Non-terminal DNAME (for IPv6) */
+ ns_t_sink = 40, /* Kitchen sink (experimentatl) */
+ ns_t_opt = 41, /* EDNS0 option (meta-RR) */
+ ns_t_apl = 42, /* Address prefix list (RFC3123) */
+ ns_t_ds = 43, /* Delegation Signer (RFC4034) */
+ ns_t_sshfp = 44, /* SSH Key Fingerprint (RFC4255) */
+ ns_t_rrsig = 46, /* Resource Record Signature (RFC4034) */
+ ns_t_nsec = 47, /* Next Secure (RFC4034) */
+ ns_t_dnskey = 48, /* DNS Public Key (RFC4034) */
+ ns_t_tkey = 249, /* Transaction key */
+ ns_t_tsig = 250, /* Transaction signature. */
+ ns_t_ixfr = 251, /* Incremental zone transfer. */
+ ns_t_axfr = 252, /* Transfer zone of authority. */
+ ns_t_mailb = 253, /* Transfer mailbox records. */
+ ns_t_maila = 254, /* Transfer mail agent records. */
+ ns_t_any = 255, /* Wildcard match. */
+ ns_t_zxfr = 256, /* BIND-specific, nonstandard. */
+ ns_t_max = 65536
+} ns_type;
+
+#endif /* GRPC_HAVE_ARPA_NAMESER */
+
+#endif /* GRPC_CORE_LIB_IOMGR_NAMESER_H */
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index f5d15b4850..c12058f890 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -24,6 +24,7 @@
#if defined(GRPC_UV)
// Do nothing
#elif defined(GPR_MANYLINUX1)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_IP_PKTINFO 1
@@ -51,6 +52,7 @@
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_LINUX)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_IP_PKTINFO 1
@@ -82,6 +84,7 @@
#define GRPC_POSIX_SOCKETUTILS
#endif
#elif defined(GPR_APPLE)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_SO_NOSIGPIPE 1
#define GRPC_HAVE_UNIX_SOCKET 1
@@ -93,6 +96,7 @@
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_FREEBSD)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_SO_NOSIGPIPE 1
@@ -104,6 +108,7 @@
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_NACL)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
#define GRPC_POSIX_SOCKET 1
#define GRPC_POSIX_SOCKETADDR 1
diff --git a/src/core/lib/json/json_reader.c b/src/core/lib/json/json_reader.c
index 75f59e0912..094a35176c 100644
--- a/src/core/lib/json/json_reader.c
+++ b/src/core/lib/json/json_reader.c
@@ -178,6 +178,7 @@ grpc_json_reader_status grpc_json_reader_run(grpc_json_reader *reader) {
json_reader_string_clear(reader);
reader->state = GRPC_JSON_STATE_VALUE_END;
/* The missing break here is intentional. */
+ /* fallthrough */
case GRPC_JSON_STATE_VALUE_END:
case GRPC_JSON_STATE_OBJECT_KEY_BEGIN:
diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c
index 239a211c0b..b9da6e16b2 100644
--- a/src/core/lib/security/transport/security_handshaker.c
+++ b/src/core/lib/security/transport/security_handshaker.c
@@ -147,7 +147,7 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
goto done;
}
// Get unused bytes.
- unsigned char *unused_bytes = NULL;
+ const unsigned char *unused_bytes = NULL;
size_t unused_bytes_size = 0;
result = tsi_handshaker_result_get_unused_bytes(
h->handshaker_result, &unused_bytes, &unused_bytes_size);
diff --git a/src/core/lib/support/murmur_hash.c b/src/core/lib/support/murmur_hash.c
index f329611818..f06b970de7 100644
--- a/src/core/lib/support/murmur_hash.c
+++ b/src/core/lib/support/murmur_hash.c
@@ -62,8 +62,10 @@ uint32_t gpr_murmur_hash3(const void *key, size_t len, uint32_t seed) {
switch (len & 3) {
case 3:
k1 ^= ((uint32_t)tail[2]) << 16;
+ /* fallthrough */
case 2:
k1 ^= ((uint32_t)tail[1]) << 8;
+ /* fallthrough */
case 1:
k1 ^= tail[0];
k1 *= c1;
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c
index ef8405cca8..55934964f3 100644
--- a/src/core/lib/surface/alarm.c
+++ b/src/core/lib/surface/alarm.c
@@ -18,6 +18,7 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/completion_queue.h"
@@ -49,7 +50,7 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
alarm->cq = cq;
alarm->tag = tag;
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&exec_ctx, &alarm->alarm,
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index bfd2b45fcb..e18874d054 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -648,6 +648,8 @@ static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
static grpc_error *error_from_status(grpc_status_code status,
const char *description) {
+ // copying 'description' is needed to ensure the grpc_call_cancel_with_status
+ // guarantee that can be short-lived.
return grpc_error_set_int(
grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
GRPC_ERROR_STR_GRPC_MESSAGE,
@@ -898,7 +900,7 @@ grpc_call_test_only_get_incoming_stream_encodings(grpc_call *call) {
return call->incoming_stream_compression_algorithm;
}
-static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) {
+static grpc_linked_mdelem *linked_from_md(const grpc_metadata *md) {
return (grpc_linked_mdelem *)&md->internal_data;
}
@@ -922,7 +924,7 @@ static int prepare_application_metadata(
for (i = 0; i < total_count; i++) {
const grpc_metadata *md =
get_md_elem(metadata, additional_metadata, i, count);
- grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
+ grpc_linked_mdelem *l = linked_from_md(md);
GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
if (!GRPC_LOG_IF_ERROR("validate_metadata",
grpc_validate_header_key_is_legal(md->key))) {
@@ -939,7 +941,7 @@ static int prepare_application_metadata(
for (int j = 0; j < i; j++) {
const grpc_metadata *md =
get_md_elem(metadata, additional_metadata, j, count);
- grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
+ grpc_linked_mdelem *l = linked_from_md(md);
GRPC_MDELEM_UNREF(exec_ctx, l->md);
}
return 0;
@@ -957,9 +959,12 @@ static int prepare_application_metadata(
}
for (i = 0; i < total_count; i++) {
grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count);
- GRPC_LOG_IF_ERROR(
- "prepare_application_metadata",
- grpc_metadata_batch_link_tail(exec_ctx, batch, linked_from_md(md)));
+ grpc_linked_mdelem *l = linked_from_md(md);
+ grpc_error *error = grpc_metadata_batch_link_tail(exec_ctx, batch, l);
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_MDELEM_UNREF(exec_ctx, l->md);
+ }
+ GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
}
call->send_extra_metadata_count = 0;
@@ -1571,7 +1576,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (nops == 0) {
if (!is_notify_tag_closure) {
- grpc_cq_begin_op(call->cq, notify_tag);
+ GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
free_no_op_completion, NULL,
gpr_malloc(sizeof(grpc_cq_completion)));
@@ -1900,7 +1905,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
GRPC_CALL_INTERNAL_REF(call, "completion");
if (!is_notify_tag_closure) {
- grpc_cq_begin_op(call->cq, notify_tag);
+ GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
}
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
@@ -2021,6 +2026,8 @@ const char *grpc_call_error_to_string(grpc_call_error error) {
return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
+ case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
+ return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
case GRPC_CALL_OK:
return "GRPC_CALL_OK";
}
diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c
index 80eb80af78..e85b308850 100644
--- a/src/core/lib/surface/channel_ping.c
+++ b/src/core/lib/surface/channel_ping.c
@@ -59,7 +59,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx);
op->send_ping = &pr->closure;
op->bind_pollset = grpc_cq_pollset(cq);
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
top_elem->filter->start_transport_op(&exec_ctx, top_elem, op);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 978d7b4171..3d82a32e82 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -196,7 +196,7 @@ typedef struct cq_vtable {
void (*init)(void *data);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq);
void (*destroy)(void *data);
- void (*begin_op)(grpc_completion_queue *cq, void *tag);
+ bool (*begin_op)(grpc_completion_queue *cq, void *tag);
void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
@@ -288,8 +288,8 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq);
-static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
-static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
+static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
+static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq, void *tag,
@@ -522,33 +522,6 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
}
}
-static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
- cq_next_data *cqd = DATA_FROM_CQ(cq);
- GPR_ASSERT(!cqd->shutdown_called);
- gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1);
-}
-
-static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
- GPR_ASSERT(!cqd->shutdown_called);
- gpr_ref(&cqd->pending_events);
-}
-
-void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
-#ifndef NDEBUG
- gpr_mu_lock(cq->mu);
- if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
- cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
- cq->outstanding_tags =
- gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) *
- cq->outstanding_tag_capacity);
- }
- cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
- gpr_mu_unlock(cq->mu);
-#endif
- cq->vtable->begin_op(cq, tag);
-}
-
#ifndef NDEBUG
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
int found = 0;
@@ -576,6 +549,41 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
#endif
+static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
+ while (true) {
+ gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events);
+ if (count == 0) {
+ return false;
+ } else if (gpr_atm_no_barrier_cas(&cqd->pending_events, count, count + 1)) {
+ break;
+ }
+ }
+ return true;
+}
+
+static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ GPR_ASSERT(!cqd->shutdown_called);
+ gpr_ref(&cqd->pending_events);
+ return true;
+}
+
+bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
+#ifndef NDEBUG
+ gpr_mu_lock(cq->mu);
+ if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
+ cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
+ cq->outstanding_tags =
+ gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) *
+ cq->outstanding_tag_capacity);
+ }
+ cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
+ gpr_mu_unlock(cq->mu);
+#endif
+ return cq->vtable->begin_op(cq, tag);
+}
+
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_NEXT) */
@@ -855,8 +863,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
inconsistent state. If it is the latter, we shold do a 0-timeout poll
so that the thread comes back quickly from poll to make a second
attempt at popping. Not doing this can potentially deadlock this
- thread
- forever (if the deadline is infinity) */
+ thread forever (if the deadline is infinity) */
if (cq_event_queue_num_items(&cqd->queue) > 0) {
iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
}
@@ -869,10 +876,8 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
if (cq_event_queue_num_items(&cqd->queue) > 0) {
/* Go to the beginning of the loop. No point doing a poll because
(cq->shutdown == true) is only possible when there is no pending
- work
- (i.e cq->pending_events == 0) and any outstanding
- grpc_cq_completion
- events are already queued on this cq */
+ work (i.e cq->pending_events == 0) and any outstanding completion
+ events should have already been queued on this cq */
continue;
}
@@ -909,11 +914,6 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
is_finished_arg.first_loop = false;
}
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
- grpc_exec_ctx_finish(&exec_ctx);
- GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
-
if (cq_event_queue_num_items(&cqd->queue) > 0 &&
gpr_atm_no_barrier_load(&cqd->pending_events) > 0) {
gpr_mu_lock(cq->mu);
@@ -921,6 +921,11 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
gpr_mu_unlock(cq->mu);
}
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
+ grpc_exec_ctx_finish(&exec_ctx);
+ GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
+
GPR_TIMER_END("grpc_completion_queue_next", 0);
return ret;
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index af44482513..69d144bd95 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -72,8 +72,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc);
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made.
- \a tag is currently used only in debug builds. */
-void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
+ \a tag is currently used only in debug builds. Return true on success, and
+ false if completion_queue has been shutdown. */
+bool grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
/* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to
grpc_cq_begin_op */
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index fce7f8dca1..66dcc299aa 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1259,7 +1259,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
}
/* stay locked, and gather up some stuff to do */
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
if (server->shutdown_published) {
grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown,
NULL, gpr_malloc(sizeof(grpc_cq_completion)));
@@ -1446,7 +1446,11 @@ grpc_call_error grpc_server_request_call(
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
}
- grpc_cq_begin_op(cq_for_notification, tag);
+ if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
+ goto done;
+ }
details->reserved = NULL;
rc->cq_idx = cq_idx;
rc->type = BATCH_CALL;
@@ -1496,7 +1500,11 @@ grpc_call_error grpc_server_request_registered_call(
error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
goto done;
}
- grpc_cq_begin_op(cq_for_notification, tag);
+ if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
+ goto done;
+ }
rc->cq_idx = cq_idx;
rc->type = REGISTERED_CALL;
rc->server = server;
diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c
index 1280680663..810447313c 100644
--- a/src/core/tsi/fake_transport_security.c
+++ b/src/core/tsi/fake_transport_security.c
@@ -391,7 +391,7 @@ static tsi_result fake_handshaker_result_create_frame_protector(
}
static tsi_result fake_handshaker_result_get_unused_bytes(
- const tsi_handshaker_result *self, unsigned char **bytes,
+ const tsi_handshaker_result *self, const unsigned char **bytes,
size_t *bytes_size) {
fake_handshaker_result *result = (fake_handshaker_result *)self;
*bytes_size = result->unused_bytes_size;
diff --git a/src/core/tsi/transport_security.c b/src/core/tsi/transport_security.c
index be11d64472..2b1f4310c1 100644
--- a/src/core/tsi/transport_security.c
+++ b/src/core/tsi/transport_security.c
@@ -240,7 +240,7 @@ tsi_result tsi_handshaker_result_create_frame_protector(
}
tsi_result tsi_handshaker_result_get_unused_bytes(
- const tsi_handshaker_result *self, unsigned char **bytes,
+ const tsi_handshaker_result *self, const unsigned char **bytes,
size_t *bytes_size) {
if (self == NULL || bytes == NULL || bytes_size == NULL) {
return TSI_INVALID_ARGUMENT;
diff --git a/src/core/tsi/transport_security.h b/src/core/tsi/transport_security.h
index 4a56c25602..2c7db6bca9 100644
--- a/src/core/tsi/transport_security.h
+++ b/src/core/tsi/transport_security.h
@@ -90,7 +90,8 @@ typedef struct {
size_t *max_output_protected_frame_size,
tsi_frame_protector **protector);
tsi_result (*get_unused_bytes)(const tsi_handshaker_result *self,
- unsigned char **bytes, size_t *bytes_size);
+ const unsigned char **bytes,
+ size_t *bytes_size);
void (*destroy)(tsi_handshaker_result *self);
} tsi_handshaker_result_vtable;
diff --git a/src/core/tsi/transport_security_adapter.c b/src/core/tsi/transport_security_adapter.c
index a0564945e4..b6dc660c47 100644
--- a/src/core/tsi/transport_security_adapter.c
+++ b/src/core/tsi/transport_security_adapter.c
@@ -50,7 +50,7 @@ static tsi_result adapter_result_create_frame_protector(
}
static tsi_result adapter_result_get_unused_bytes(
- const tsi_handshaker_result *self, unsigned char **bytes,
+ const tsi_handshaker_result *self, const unsigned char **bytes,
size_t *byte_size) {
tsi_adapter_handshaker_result *impl = (tsi_adapter_handshaker_result *)self;
*bytes = impl->unused_bytes;
diff --git a/src/core/tsi/transport_security_interface.h b/src/core/tsi/transport_security_interface.h
index 137f8ee5c3..39ba8addc4 100644
--- a/src/core/tsi/transport_security_interface.h
+++ b/src/core/tsi/transport_security_interface.h
@@ -221,7 +221,7 @@ tsi_result tsi_handshaker_result_create_frame_protector(
Ownership of the bytes is retained by the handshaker result. As a
consequence, the caller must not free the bytes. */
tsi_result tsi_handshaker_result_get_unused_bytes(
- const tsi_handshaker_result *self, unsigned char **bytes,
+ const tsi_handshaker_result *self, const unsigned char **bytes,
size_t *byte_size);
/* This method releases the tsi_handshaker_handshaker object. After this method