aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/channel_connectivity.c2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c1
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c78
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c79
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h27
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c44
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c22
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h48
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c6
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c2
-rw-r--r--src/core/ext/filters/client_channel/retry_throttle.c22
-rw-r--r--src/core/ext/filters/client_channel/subchannel_index.c88
-rw-r--r--src/core/ext/filters/http/client/http_client_filter.c514
-rw-r--r--src/core/ext/filters/http/message_compress/message_compress_filter.c202
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c195
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.c2
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h23
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c85
-rw-r--r--src/core/ext/transport/inproc/inproc_transport.c33
-rw-r--r--src/core/lib/iomgr/nameser.h104
-rw-r--r--src/core/lib/iomgr/port.h5
-rw-r--r--src/core/lib/support/avl.c165
-rw-r--r--src/core/lib/surface/alarm.c3
-rw-r--r--src/core/lib/surface/call.c6
-rw-r--r--src/core/lib/surface/channel_ping.c2
-rw-r--r--src/core/lib/surface/completion_queue.c87
-rw-r--r--src/core/lib/surface/completion_queue.h5
-rw-r--r--src/core/lib/surface/server.c14
-rw-r--r--src/core/lib/transport/byte_stream.c128
-rw-r--r--src/core/lib/transport/byte_stream.h99
-rw-r--r--src/core/lib/transport/transport.c24
-rw-r--r--src/core/lib/transport/transport.h9
33 files changed, 1365 insertions, 761 deletions
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c
index c3dca14305..b83c95275f 100644
--- a/src/core/ext/filters/client_channel/channel_connectivity.c
+++ b/src/core/ext/filters/client_channel/channel_connectivity.c
@@ -208,7 +208,7 @@ void grpc_channel_watch_connectivity_state(
7, (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec,
(int)deadline.clock_type, cq, tag));
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
gpr_mu_init(&w->mu);
GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w,
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
index 52c6e38c87..568bb2ba8d 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
@@ -88,7 +88,6 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
// Record call finished, optionally setting client_failed_to_send and
// received.
grpc_grpclb_client_stats_add_call_finished(
- false /* drop_for_rate_limiting */, false /* drop_for_load_balancing */,
!calld->send_initial_metadata_succeeded /* client_failed_to_send */,
calld->recv_initial_metadata_succeeded /* known_received */,
calld->client_stats);
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index ebce801b37..bb9217d843 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -416,9 +416,7 @@ struct rr_connectivity_data {
static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
bool log) {
- if (server->drop_for_rate_limiting || server->drop_for_load_balancing) {
- return false;
- }
+ if (server->drop) return false;
const grpc_grpclb_ip_address *ip = &server->ip_address;
if (server->port >> 16 != 0) {
if (log) {
@@ -462,7 +460,7 @@ static const grpc_lb_user_data_vtable lb_token_vtable = {
static void parse_server(const grpc_grpclb_server *server,
grpc_resolved_address *addr) {
memset(addr, 0, sizeof(*addr));
- if (server->drop_for_rate_limiting || server->drop_for_load_balancing) return;
+ if (server->drop) return;
const uint16_t netorder_port = htons((uint16_t)server->port);
/* the addresses are given in binary format (a in(6)_addr struct) in
* server->ip_address.bytes. */
@@ -610,7 +608,7 @@ static bool pick_from_internal_rr_locked(
if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
glb_policy->serverlist_index = 0; // Wrap-around.
}
- if (server->drop_for_rate_limiting || server->drop_for_load_balancing) {
+ if (server->drop) {
// Not using the RR policy, so unref it.
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
@@ -622,11 +620,8 @@ static bool pick_from_internal_rr_locked(
// the client_load_reporting filter, because we do not create a
// subchannel call (and therefore no client_load_reporting filter)
// for dropped calls.
- grpc_grpclb_client_stats_add_call_started(wc_arg->client_stats);
- grpc_grpclb_client_stats_add_call_finished(
- server->drop_for_rate_limiting, server->drop_for_load_balancing,
- false /* failed_to_send */, false /* known_received */,
- wc_arg->client_stats);
+ grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token,
+ wc_arg->client_stats);
grpc_grpclb_client_stats_unref(wc_arg->client_stats);
if (force_async) {
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
@@ -715,7 +710,6 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
return;
}
glb_policy->rr_policy = new_rr_policy;
-
grpc_error *rr_state_error = NULL;
const grpc_connectivity_state rr_state =
grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy,
@@ -741,7 +735,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
rr_connectivity->state = rr_state;
/* Subscribe to changes to the connectivity of the new RR */
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_sched");
+ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb");
grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
&rr_connectivity->state,
&rr_connectivity->on_change);
@@ -806,32 +800,31 @@ static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
rr_connectivity_data *rr_connectivity = arg;
glb_lb_policy *glb_policy = rr_connectivity->glb_policy;
-
- const bool shutting_down = glb_policy->shutting_down;
- bool unref_needed = false;
- GRPC_ERROR_REF(error);
-
- if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN || shutting_down) {
- /* RR policy shutting down. Don't renew subscription and free the arg of
- * this callback. In addition we need to stash away the current policy to
- * be UNREF'd after releasing the lock. Otherwise, if the UNREF is the last
- * one, the policy would be destroyed, alongside the lock, which would
- * result in a use-after-free */
- unref_needed = true;
+ if (glb_policy->shutting_down) {
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+ "glb_rr_connectivity_cb");
gpr_free(rr_connectivity);
- } else { /* rr state != SHUTDOWN && !shutting down: biz as usual */
- update_lb_connectivity_status_locked(
- exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error));
- /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */
- grpc_lb_policy_notify_on_state_change_locked(
- exec_ctx, glb_policy->rr_policy, &rr_connectivity->state,
- &rr_connectivity->on_change);
+ return;
}
- if (unref_needed) {
+ if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN) {
+ /* An RR policy that has transitioned into the SHUTDOWN connectivity state
+ * should not be considered for picks or updates: the SHUTDOWN state is a
+ * sink, policies can't transition back from it. .*/
+ GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy,
+ "rr_connectivity_shutdown");
+ glb_policy->rr_policy = NULL;
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
- "rr_connectivity_cb");
+ "glb_rr_connectivity_cb");
+ gpr_free(rr_connectivity);
+ return;
}
- GRPC_ERROR_UNREF(error);
+ /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */
+ update_lb_connectivity_status_locked(
+ exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error));
+ /* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */
+ grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
+ &rr_connectivity->state,
+ &rr_connectivity->on_change);
}
static void destroy_balancer_name(grpc_exec_ctx *exec_ctx,
@@ -995,7 +988,6 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
gpr_free(glb_policy);
return NULL;
}
-
GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
glb_lb_channel_on_connectivity_changed_cb, glb_policy,
grpc_combiner_scheduler(args->combiner));
@@ -1052,7 +1044,7 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
glb_policy->pending_picks = NULL;
pending_ping *pping = glb_policy->pending_pings;
glb_policy->pending_pings = NULL;
- if (glb_policy->rr_policy) {
+ if (glb_policy->rr_policy != NULL) {
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
}
// We destroy the LB channel here because
@@ -1309,15 +1301,14 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx,
}
static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
+ grpc_grpclb_dropped_call_counts *drop_entries =
+ request->client_stats.calls_finished_with_drop.arg;
return request->client_stats.num_calls_started == 0 &&
request->client_stats.num_calls_finished == 0 &&
- request->client_stats.num_calls_finished_with_drop_for_rate_limiting ==
- 0 &&
- request->client_stats
- .num_calls_finished_with_drop_for_load_balancing == 0 &&
request->client_stats.num_calls_finished_with_client_failed_to_send ==
0 &&
- request->client_stats.num_calls_finished_known_received == 0;
+ request->client_stats.num_calls_finished_known_received == 0 &&
+ (drop_entries == NULL || drop_entries->num_entries == 0);
}
static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
@@ -1332,7 +1323,7 @@ static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
// Construct message payload.
GPR_ASSERT(glb_policy->client_load_report_payload == NULL);
grpc_grpclb_request *request =
- grpc_grpclb_load_report_request_create(glb_policy->client_stats);
+ grpc_grpclb_load_report_request_create_locked(glb_policy->client_stats);
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
if (load_report_counters_are_zero(request)) {
@@ -1778,7 +1769,8 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
if (!glb_policy->watching_lb_channel) {
// Watch the LB channel connectivity for connection.
- glb_policy->lb_channel_connectivity = GRPC_CHANNEL_INIT;
+ glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state(
+ glb_policy->lb_channel, true /* try to connect */);
grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(glb_policy->lb_channel));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
index c762443b7c..5b62623145 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
@@ -18,8 +18,11 @@
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
+#include <string.h>
+
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
@@ -29,10 +32,11 @@
struct grpc_grpclb_client_stats {
gpr_refcount refs;
+ // This field must only be accessed via *_locked() methods.
+ grpc_grpclb_dropped_call_counts* drop_token_counts;
+ // These fields may be accessed from multiple threads at a time.
gpr_atm num_calls_started;
gpr_atm num_calls_finished;
- gpr_atm num_calls_finished_with_drop_for_rate_limiting;
- gpr_atm num_calls_finished_with_drop_for_load_balancing;
gpr_atm num_calls_finished_with_client_failed_to_send;
gpr_atm num_calls_finished_known_received;
};
@@ -51,6 +55,7 @@ grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref(
void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats) {
if (gpr_unref(&client_stats->refs)) {
+ grpc_grpclb_dropped_call_counts_destroy(client_stats->drop_token_counts);
gpr_free(client_stats);
}
}
@@ -61,21 +66,9 @@ void grpc_grpclb_client_stats_add_call_started(
}
void grpc_grpclb_client_stats_add_call_finished(
- bool finished_with_drop_for_rate_limiting,
- bool finished_with_drop_for_load_balancing,
bool finished_with_client_failed_to_send, bool finished_known_received,
grpc_grpclb_client_stats* client_stats) {
gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1);
- if (finished_with_drop_for_rate_limiting) {
- gpr_atm_full_fetch_add(
- &client_stats->num_calls_finished_with_drop_for_rate_limiting,
- (gpr_atm)1);
- }
- if (finished_with_drop_for_load_balancing) {
- gpr_atm_full_fetch_add(
- &client_stats->num_calls_finished_with_drop_for_load_balancing,
- (gpr_atm)1);
- }
if (finished_with_client_failed_to_send) {
gpr_atm_full_fetch_add(
&client_stats->num_calls_finished_with_client_failed_to_send,
@@ -87,32 +80,70 @@ void grpc_grpclb_client_stats_add_call_finished(
}
}
+void grpc_grpclb_client_stats_add_call_dropped_locked(
+ char* token, grpc_grpclb_client_stats* client_stats) {
+ // Increment num_calls_started and num_calls_finished.
+ gpr_atm_full_fetch_add(&client_stats->num_calls_started, (gpr_atm)1);
+ gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1);
+ // Record the drop.
+ if (client_stats->drop_token_counts == NULL) {
+ client_stats->drop_token_counts =
+ gpr_zalloc(sizeof(grpc_grpclb_dropped_call_counts));
+ }
+ grpc_grpclb_dropped_call_counts* drop_token_counts =
+ client_stats->drop_token_counts;
+ for (size_t i = 0; i < drop_token_counts->num_entries; ++i) {
+ if (strcmp(drop_token_counts->token_counts[i].token, token) == 0) {
+ ++drop_token_counts->token_counts[i].count;
+ return;
+ }
+ }
+ // Not found, so add a new entry. We double the size of the array each time.
+ size_t new_num_entries = 2;
+ while (new_num_entries < drop_token_counts->num_entries + 1) {
+ new_num_entries *= 2;
+ }
+ drop_token_counts->token_counts =
+ gpr_realloc(drop_token_counts->token_counts,
+ new_num_entries * sizeof(grpc_grpclb_drop_token_count));
+ grpc_grpclb_drop_token_count* new_entry =
+ &drop_token_counts->token_counts[drop_token_counts->num_entries++];
+ new_entry->token = gpr_strdup(token);
+ new_entry->count = 1;
+}
+
static void atomic_get_and_reset_counter(int64_t* value, gpr_atm* counter) {
*value = (int64_t)gpr_atm_acq_load(counter);
gpr_atm_full_fetch_add(counter, (gpr_atm)(-*value));
}
-void grpc_grpclb_client_stats_get(
+void grpc_grpclb_client_stats_get_locked(
grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started,
int64_t* num_calls_finished,
- int64_t* num_calls_finished_with_drop_for_rate_limiting,
- int64_t* num_calls_finished_with_drop_for_load_balancing,
int64_t* num_calls_finished_with_client_failed_to_send,
- int64_t* num_calls_finished_known_received) {
+ int64_t* num_calls_finished_known_received,
+ grpc_grpclb_dropped_call_counts** drop_token_counts) {
atomic_get_and_reset_counter(num_calls_started,
&client_stats->num_calls_started);
atomic_get_and_reset_counter(num_calls_finished,
&client_stats->num_calls_finished);
atomic_get_and_reset_counter(
- num_calls_finished_with_drop_for_rate_limiting,
- &client_stats->num_calls_finished_with_drop_for_rate_limiting);
- atomic_get_and_reset_counter(
- num_calls_finished_with_drop_for_load_balancing,
- &client_stats->num_calls_finished_with_drop_for_load_balancing);
- atomic_get_and_reset_counter(
num_calls_finished_with_client_failed_to_send,
&client_stats->num_calls_finished_with_client_failed_to_send);
atomic_get_and_reset_counter(
num_calls_finished_known_received,
&client_stats->num_calls_finished_known_received);
+ *drop_token_counts = client_stats->drop_token_counts;
+ client_stats->drop_token_counts = NULL;
+}
+
+void grpc_grpclb_dropped_call_counts_destroy(
+ grpc_grpclb_dropped_call_counts* drop_entries) {
+ if (drop_entries != NULL) {
+ for (size_t i = 0; i < drop_entries->num_entries; ++i) {
+ gpr_free(drop_entries->token_counts[i].token);
+ }
+ gpr_free(drop_entries->token_counts);
+ gpr_free(drop_entries);
+ }
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
index 4bb47d5c5c..c51e2a431a 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
@@ -25,6 +25,16 @@
typedef struct grpc_grpclb_client_stats grpc_grpclb_client_stats;
+typedef struct {
+ char* token;
+ int64_t count;
+} grpc_grpclb_drop_token_count;
+
+typedef struct {
+ grpc_grpclb_drop_token_count* token_counts;
+ size_t num_entries;
+} grpc_grpclb_dropped_call_counts;
+
grpc_grpclb_client_stats* grpc_grpclb_client_stats_create();
grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref(
grpc_grpclb_client_stats* client_stats);
@@ -33,18 +43,23 @@ void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats);
void grpc_grpclb_client_stats_add_call_started(
grpc_grpclb_client_stats* client_stats);
void grpc_grpclb_client_stats_add_call_finished(
- bool finished_with_drop_for_rate_limiting,
- bool finished_with_drop_for_load_balancing,
bool finished_with_client_failed_to_send, bool finished_known_received,
grpc_grpclb_client_stats* client_stats);
-void grpc_grpclb_client_stats_get(
+// This method is not thread-safe; caller must synchronize.
+void grpc_grpclb_client_stats_add_call_dropped_locked(
+ char* token, grpc_grpclb_client_stats* client_stats);
+
+// This method is not thread-safe; caller must synchronize.
+void grpc_grpclb_client_stats_get_locked(
grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started,
int64_t* num_calls_finished,
- int64_t* num_calls_finished_with_drop_for_rate_limiting,
- int64_t* num_calls_finished_with_drop_for_load_balancing,
int64_t* num_calls_finished_with_client_failed_to_send,
- int64_t* num_calls_finished_known_received);
+ int64_t* num_calls_finished_known_received,
+ grpc_grpclb_dropped_call_counts** drop_token_counts);
+
+void grpc_grpclb_dropped_call_counts_destroy(
+ grpc_grpclb_dropped_call_counts* drop_entries);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H \
*/
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
index bec7c97a78..6fa29f326e 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
@@ -76,7 +76,33 @@ static void populate_timestamp(gpr_timespec timestamp,
timestamp_pb->nanos = timestamp.tv_nsec;
}
-grpc_grpclb_request *grpc_grpclb_load_report_request_create(
+static bool encode_string(pb_ostream_t *stream, const pb_field_t *field,
+ void *const *arg) {
+ char *str = *arg;
+ if (!pb_encode_tag_for_field(stream, field)) return false;
+ return pb_encode_string(stream, (uint8_t *)str, strlen(str));
+}
+
+static bool encode_drops(pb_ostream_t *stream, const pb_field_t *field,
+ void *const *arg) {
+ grpc_grpclb_dropped_call_counts *drop_entries = *arg;
+ if (drop_entries == NULL) return true;
+ for (size_t i = 0; i < drop_entries->num_entries; ++i) {
+ if (!pb_encode_tag_for_field(stream, field)) return false;
+ grpc_lb_v1_ClientStatsPerToken drop_message;
+ drop_message.load_balance_token.funcs.encode = encode_string;
+ drop_message.load_balance_token.arg = drop_entries->token_counts[i].token;
+ drop_message.has_num_calls = true;
+ drop_message.num_calls = drop_entries->token_counts[i].count;
+ if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields,
+ &drop_message)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+grpc_grpclb_request *grpc_grpclb_load_report_request_create_locked(
grpc_grpclb_client_stats *client_stats) {
grpc_grpclb_request *req = gpr_zalloc(sizeof(grpc_grpclb_request));
req->has_client_stats = true;
@@ -84,18 +110,17 @@ grpc_grpclb_request *grpc_grpclb_load_report_request_create(
populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp);
req->client_stats.has_num_calls_started = true;
req->client_stats.has_num_calls_finished = true;
- req->client_stats.has_num_calls_finished_with_drop_for_rate_limiting = true;
- req->client_stats.has_num_calls_finished_with_drop_for_load_balancing = true;
req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
req->client_stats.has_num_calls_finished_known_received = true;
- grpc_grpclb_client_stats_get(
+ req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops;
+ grpc_grpclb_client_stats_get_locked(
client_stats, &req->client_stats.num_calls_started,
&req->client_stats.num_calls_finished,
- &req->client_stats.num_calls_finished_with_drop_for_rate_limiting,
- &req->client_stats.num_calls_finished_with_drop_for_load_balancing,
&req->client_stats.num_calls_finished_with_client_failed_to_send,
- &req->client_stats.num_calls_finished_known_received);
+ &req->client_stats.num_calls_finished_known_received,
+ (grpc_grpclb_dropped_call_counts **)&req->client_stats
+ .calls_finished_with_drop.arg);
return req;
}
@@ -117,6 +142,11 @@ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) {
}
void grpc_grpclb_request_destroy(grpc_grpclb_request *request) {
+ if (request->has_client_stats) {
+ grpc_grpclb_dropped_call_counts *drop_entries =
+ request->client_stats.calls_finished_with_drop.arg;
+ grpc_grpclb_dropped_call_counts_destroy(drop_entries);
+ }
gpr_free(request);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
index ef8d563edc..c4a98492c9 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
@@ -44,7 +44,7 @@ typedef struct {
/** Create a request for a gRPC LB service under \a lb_service_name */
grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name);
-grpc_grpclb_request *grpc_grpclb_load_report_request_create(
+grpc_grpclb_request *grpc_grpclb_load_report_request_create_locked(
grpc_grpclb_client_stats *client_stats);
/** Protocol Buffers v3-encode \a request */
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
index fb119c7fc8..6a5d54c82a 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
@@ -33,14 +33,19 @@ const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2] = {
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v1_ClientStats_fields[8] = {
+const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3] = {
+ PB_FIELD( 1, STRING , OPTIONAL, CALLBACK, FIRST, grpc_lb_v1_ClientStatsPerToken, load_balance_token, load_balance_token, 0),
+ PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStatsPerToken, num_calls, load_balance_token, 0),
+ PB_LAST_FIELD
+};
+
+const pb_field_t grpc_lb_v1_ClientStats_fields[7] = {
PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v1_ClientStats, timestamp, timestamp, &grpc_lb_v1_Timestamp_fields),
PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_started, timestamp, 0),
PB_FIELD( 3, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished, num_calls_started, 0),
- PB_FIELD( 4, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_rate_limiting, num_calls_finished, 0),
- PB_FIELD( 5, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_load_balancing, num_calls_finished_with_drop_for_rate_limiting, 0),
- PB_FIELD( 6, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished_with_drop_for_load_balancing, 0),
+ PB_FIELD( 6, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished, 0),
PB_FIELD( 7, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_known_received, num_calls_finished_with_client_failed_to_send, 0),
+ PB_FIELD( 8, MESSAGE , REPEATED, CALLBACK, OTHER, grpc_lb_v1_ClientStats, calls_finished_with_drop, num_calls_finished_known_received, &grpc_lb_v1_ClientStatsPerToken_fields),
PB_LAST_FIELD
};
@@ -62,12 +67,11 @@ const pb_field_t grpc_lb_v1_ServerList_fields[3] = {
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v1_Server_fields[6] = {
+const pb_field_t grpc_lb_v1_Server_fields[5] = {
PB_FIELD( 1, BYTES , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, port, ip_address, 0),
PB_FIELD( 3, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0),
- PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_rate_limiting, load_balance_token, 0),
- PB_FIELD( 5, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_load_balancing, drop_for_rate_limiting, 0),
+ PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop, load_balance_token, 0),
PB_LAST_FIELD
};
@@ -81,7 +85,7 @@ const pb_field_t grpc_lb_v1_Server_fields[6] = {
* numbers or field sizes that are larger than what can fit in 8 or 16 bit
* field descriptors.
*/
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
#if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT)
@@ -92,7 +96,7 @@ PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request)
* numbers or field sizes that are larger than what can fit in the default
* 8 bit descriptors.
*/
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
index d3ae919ec2..93333d1aed 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
@@ -14,6 +14,13 @@ extern "C" {
#endif
/* Struct definitions */
+typedef struct _grpc_lb_v1_ClientStatsPerToken {
+ pb_callback_t load_balance_token;
+ bool has_num_calls;
+ int64_t num_calls;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStatsPerToken) */
+} grpc_lb_v1_ClientStatsPerToken;
+
typedef struct _grpc_lb_v1_Duration {
bool has_seconds;
int64_t seconds;
@@ -36,10 +43,8 @@ typedef struct _grpc_lb_v1_Server {
int32_t port;
bool has_load_balance_token;
char load_balance_token[50];
- bool has_drop_for_rate_limiting;
- bool drop_for_rate_limiting;
- bool has_drop_for_load_balancing;
- bool drop_for_load_balancing;
+ bool has_drop;
+ bool drop;
/* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */
} grpc_lb_v1_Server;
@@ -58,14 +63,11 @@ typedef struct _grpc_lb_v1_ClientStats {
int64_t num_calls_started;
bool has_num_calls_finished;
int64_t num_calls_finished;
- bool has_num_calls_finished_with_drop_for_rate_limiting;
- int64_t num_calls_finished_with_drop_for_rate_limiting;
- bool has_num_calls_finished_with_drop_for_load_balancing;
- int64_t num_calls_finished_with_drop_for_load_balancing;
bool has_num_calls_finished_with_client_failed_to_send;
int64_t num_calls_finished_with_client_failed_to_send;
bool has_num_calls_finished_known_received;
int64_t num_calls_finished_known_received;
+ pb_callback_t calls_finished_with_drop;
/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */
} grpc_lb_v1_ClientStats;
@@ -107,39 +109,41 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_Timestamp_init_default {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_default {false, grpc_lb_v1_InitialLoadBalanceRequest_init_default, false, grpc_lb_v1_ClientStats_init_default}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_default {false, ""}
-#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0}
+#define grpc_lb_v1_ClientStatsPerToken_init_default {{{NULL}, NULL}, false, 0}
+#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}}
#define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default}
#define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default}
-#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0, false, 0}
+#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0}
#define grpc_lb_v1_Duration_init_zero {false, 0, false, 0}
#define grpc_lb_v1_Timestamp_init_zero {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_zero {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""}
-#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0}
+#define grpc_lb_v1_ClientStatsPerToken_init_zero {{{NULL}, NULL}, false, 0}
+#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}}
#define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero}
#define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero}
-#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0, false, 0}
+#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0}
/* Field tags (for use in manual encoding/decoding) */
+#define grpc_lb_v1_ClientStatsPerToken_load_balance_token_tag 1
+#define grpc_lb_v1_ClientStatsPerToken_num_calls_tag 2
#define grpc_lb_v1_Duration_seconds_tag 1
#define grpc_lb_v1_Duration_nanos_tag 2
#define grpc_lb_v1_InitialLoadBalanceRequest_name_tag 1
#define grpc_lb_v1_Server_ip_address_tag 1
#define grpc_lb_v1_Server_port_tag 2
#define grpc_lb_v1_Server_load_balance_token_tag 3
-#define grpc_lb_v1_Server_drop_for_rate_limiting_tag 4
-#define grpc_lb_v1_Server_drop_for_load_balancing_tag 5
+#define grpc_lb_v1_Server_drop_tag 4
#define grpc_lb_v1_Timestamp_seconds_tag 1
#define grpc_lb_v1_Timestamp_nanos_tag 2
#define grpc_lb_v1_ClientStats_timestamp_tag 1
#define grpc_lb_v1_ClientStats_num_calls_started_tag 2
#define grpc_lb_v1_ClientStats_num_calls_finished_tag 3
-#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_rate_limiting_tag 4
-#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_load_balancing_tag 5
#define grpc_lb_v1_ClientStats_num_calls_finished_with_client_failed_to_send_tag 6
#define grpc_lb_v1_ClientStats_num_calls_finished_known_received_tag 7
+#define grpc_lb_v1_ClientStats_calls_finished_with_drop_tag 8
#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1
#define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 2
#define grpc_lb_v1_ServerList_servers_tag 1
@@ -154,22 +158,24 @@ extern const pb_field_t grpc_lb_v1_Duration_fields[3];
extern const pb_field_t grpc_lb_v1_Timestamp_fields[3];
extern const pb_field_t grpc_lb_v1_LoadBalanceRequest_fields[3];
extern const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2];
-extern const pb_field_t grpc_lb_v1_ClientStats_fields[8];
+extern const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3];
+extern const pb_field_t grpc_lb_v1_ClientStats_fields[7];
extern const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3];
extern const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3];
extern const pb_field_t grpc_lb_v1_ServerList_fields[3];
-extern const pb_field_t grpc_lb_v1_Server_fields[6];
+extern const pb_field_t grpc_lb_v1_Server_fields[5];
/* Maximum encoded size of messages (where known) */
#define grpc_lb_v1_Duration_size 22
#define grpc_lb_v1_Timestamp_size 22
-#define grpc_lb_v1_LoadBalanceRequest_size 226
+#define grpc_lb_v1_LoadBalanceRequest_size (140 + grpc_lb_v1_ClientStats_size)
#define grpc_lb_v1_InitialLoadBalanceRequest_size 131
-#define grpc_lb_v1_ClientStats_size 90
+/* grpc_lb_v1_ClientStatsPerToken_size depends on runtime parameters */
+/* grpc_lb_v1_ClientStats_size depends on runtime parameters */
#define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size)
#define grpc_lb_v1_InitialLoadBalanceResponse_size 90
/* grpc_lb_v1_ServerList_size depends on runtime parameters */
-#define grpc_lb_v1_Server_size 85
+#define grpc_lb_v1_Server_size 83
/* Message IDs (where set with "msgid" option) */
#ifdef PB_MSGID
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index bc40165cfb..a7f7e9542c 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -74,6 +74,9 @@ typedef struct round_robin_lb_policy {
bool started_picking;
/** are we shutting down? */
bool shutdown;
+ /** has the policy gotten into the GRPC_CHANNEL_SHUTDOWN? No picks can be
+ * service after this point, the policy will never transition out. */
+ bool in_connectivity_shutdown;
/** List of picks that are waiting on connectivity */
pending_pick *pending_picks;
@@ -420,6 +423,8 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_call_context_element *context, void **user_data,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
+ GPR_ASSERT(!p->shutdown);
+ GPR_ASSERT(!p->in_connectivity_shutdown);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Trying to pick", (void *)pol);
}
@@ -532,6 +537,7 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
"rr_shutdown");
+ p->in_connectivity_shutdown = true;
new_state = GRPC_CHANNEL_SHUTDOWN;
} else if (subchannel_list->num_transient_failures ==
p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
index 9065e33613..6ec3790a5f 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
@@ -33,13 +33,13 @@
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
-#include <nameser.h>
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/nameser.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/support/string.h"
diff --git a/src/core/ext/filters/client_channel/retry_throttle.c b/src/core/ext/filters/client_channel/retry_throttle.c
index 3009e21d49..0c7a3ae651 100644
--- a/src/core/ext/filters/client_channel/retry_throttle.c
+++ b/src/core/ext/filters/client_channel/retry_throttle.c
@@ -130,24 +130,28 @@ static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create(
// avl vtable for string -> server_retry_throttle_data map
//
-static void* copy_server_name(void* key) { return gpr_strdup(key); }
+static void* copy_server_name(void* key, void* unused) {
+ return gpr_strdup(key);
+}
-static long compare_server_name(void* key1, void* key2) {
+static long compare_server_name(void* key1, void* key2, void* unused) {
return strcmp(key1, key2);
}
-static void destroy_server_retry_throttle_data(void* value) {
+static void destroy_server_retry_throttle_data(void* value, void* unused) {
grpc_server_retry_throttle_data* throttle_data = value;
grpc_server_retry_throttle_data_unref(throttle_data);
}
-static void* copy_server_retry_throttle_data(void* value) {
+static void* copy_server_retry_throttle_data(void* value, void* unused) {
grpc_server_retry_throttle_data* throttle_data = value;
return grpc_server_retry_throttle_data_ref(throttle_data);
}
+static void destroy_server_name(void* key, void* unused) { gpr_free(key); }
+
static const gpr_avl_vtable avl_vtable = {
- gpr_free /* destroy_key */, copy_server_name, compare_server_name,
+ destroy_server_name, copy_server_name, compare_server_name,
destroy_server_retry_throttle_data, copy_server_retry_throttle_data};
//
@@ -164,19 +168,19 @@ void grpc_retry_throttle_map_init() {
void grpc_retry_throttle_map_shutdown() {
gpr_mu_destroy(&g_mu);
- gpr_avl_unref(g_avl);
+ gpr_avl_unref(g_avl, NULL);
}
grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server(
const char* server_name, int max_milli_tokens, int milli_token_ratio) {
gpr_mu_lock(&g_mu);
grpc_server_retry_throttle_data* throttle_data =
- gpr_avl_get(g_avl, (char*)server_name);
+ gpr_avl_get(g_avl, (char*)server_name, NULL);
if (throttle_data == NULL) {
// Entry not found. Create a new one.
throttle_data = grpc_server_retry_throttle_data_create(
max_milli_tokens, milli_token_ratio, NULL);
- g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data);
+ g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data, NULL);
} else {
if (throttle_data->max_milli_tokens != max_milli_tokens ||
throttle_data->milli_token_ratio != milli_token_ratio) {
@@ -184,7 +188,7 @@ grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server(
// the original one.
throttle_data = grpc_server_retry_throttle_data_create(
max_milli_tokens, milli_token_ratio, throttle_data);
- g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data);
+ g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data, NULL);
} else {
// Entry found. Increase refcount.
grpc_server_retry_throttle_data_ref(throttle_data);
diff --git a/src/core/ext/filters/client_channel/subchannel_index.c b/src/core/ext/filters/client_channel/subchannel_index.c
index a33ab950bf..ababd05d84 100644
--- a/src/core/ext/filters/client_channel/subchannel_index.c
+++ b/src/core/ext/filters/client_channel/subchannel_index.c
@@ -38,26 +38,8 @@ struct grpc_subchannel_key {
grpc_subchannel_args args;
};
-GPR_TLS_DECL(subchannel_index_exec_ctx);
-
static bool g_force_creation = false;
-static void enter_ctx(grpc_exec_ctx *exec_ctx) {
- GPR_ASSERT(gpr_tls_get(&subchannel_index_exec_ctx) == 0);
- gpr_tls_set(&subchannel_index_exec_ctx, (intptr_t)exec_ctx);
-}
-
-static void leave_ctx(grpc_exec_ctx *exec_ctx) {
- GPR_ASSERT(gpr_tls_get(&subchannel_index_exec_ctx) == (intptr_t)exec_ctx);
- gpr_tls_set(&subchannel_index_exec_ctx, 0);
-}
-
-static grpc_exec_ctx *current_ctx() {
- grpc_exec_ctx *c = (grpc_exec_ctx *)gpr_tls_get(&subchannel_index_exec_ctx);
- GPR_ASSERT(c != NULL);
- return c;
-}
-
static grpc_subchannel_key *create_key(
const grpc_subchannel_args *args,
grpc_channel_args *(*copy_channel_args)(const grpc_channel_args *args)) {
@@ -104,21 +86,25 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
gpr_free(k);
}
-static void sck_avl_destroy(void *p) {
- grpc_subchannel_key_destroy(current_ctx(), p);
+static void sck_avl_destroy(void *p, void *user_data) {
+ grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data;
+ grpc_subchannel_key_destroy(exec_ctx, p);
}
-static void *sck_avl_copy(void *p) { return subchannel_key_copy(p); }
+static void *sck_avl_copy(void *p, void *unused) {
+ return subchannel_key_copy(p);
+}
-static long sck_avl_compare(void *a, void *b) {
+static long sck_avl_compare(void *a, void *b, void *unused) {
return grpc_subchannel_key_compare(a, b);
}
-static void scv_avl_destroy(void *p) {
- GRPC_SUBCHANNEL_WEAK_UNREF(current_ctx(), p, "subchannel_index");
+static void scv_avl_destroy(void *p, void *user_data) {
+ grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data;
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, p, "subchannel_index");
}
-static void *scv_avl_copy(void *p) {
+static void *scv_avl_copy(void *p, void *unused) {
GRPC_SUBCHANNEL_WEAK_REF(p, "subchannel_index");
return p;
}
@@ -133,38 +119,33 @@ static const gpr_avl_vtable subchannel_avl_vtable = {
void grpc_subchannel_index_init(void) {
g_subchannel_index = gpr_avl_create(&subchannel_avl_vtable);
gpr_mu_init(&g_mu);
- gpr_tls_init(&subchannel_index_exec_ctx);
}
void grpc_subchannel_index_shutdown(void) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_mu_destroy(&g_mu);
- gpr_avl_unref(g_subchannel_index);
- gpr_tls_destroy(&subchannel_index_exec_ctx);
+ gpr_avl_unref(g_subchannel_index, &exec_ctx);
+ grpc_exec_ctx_finish(&exec_ctx);
}
grpc_subchannel *grpc_subchannel_index_find(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *key) {
- enter_ctx(exec_ctx);
-
// Lock, and take a reference to the subchannel index.
// We don't need to do the search under a lock as avl's are immutable.
gpr_mu_lock(&g_mu);
- gpr_avl index = gpr_avl_ref(g_subchannel_index);
+ gpr_avl index = gpr_avl_ref(g_subchannel_index, exec_ctx);
gpr_mu_unlock(&g_mu);
- grpc_subchannel *c =
- GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(gpr_avl_get(index, key), "index_find");
- gpr_avl_unref(index);
+ grpc_subchannel *c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(
+ gpr_avl_get(index, key, exec_ctx), "index_find");
+ gpr_avl_unref(index, exec_ctx);
- leave_ctx(exec_ctx);
return c;
}
grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *key,
grpc_subchannel *constructed) {
- enter_ctx(exec_ctx);
-
grpc_subchannel *c = NULL;
bool need_to_unref_constructed;
@@ -174,11 +155,11 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx,
// Compare and swap loop:
// - take a reference to the current index
gpr_mu_lock(&g_mu);
- gpr_avl index = gpr_avl_ref(g_subchannel_index);
+ gpr_avl index = gpr_avl_ref(g_subchannel_index, exec_ctx);
gpr_mu_unlock(&g_mu);
// - Check to see if a subchannel already exists
- c = gpr_avl_get(index, key);
+ c = gpr_avl_get(index, key, exec_ctx);
if (c != NULL) {
c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "index_register");
}
@@ -187,9 +168,9 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx,
need_to_unref_constructed = true;
} else {
// no -> update the avl and compare/swap
- gpr_avl updated =
- gpr_avl_add(gpr_avl_ref(index), subchannel_key_copy(key),
- GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"));
+ gpr_avl updated = gpr_avl_add(
+ gpr_avl_ref(index, exec_ctx), subchannel_key_copy(key),
+ GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"), exec_ctx);
// it may happen (but it's expected to be unlikely)
// that some other thread has changed the index:
@@ -201,13 +182,11 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx,
}
gpr_mu_unlock(&g_mu);
- gpr_avl_unref(updated);
+ gpr_avl_unref(updated, exec_ctx);
}
- gpr_avl_unref(index);
+ gpr_avl_unref(index, exec_ctx);
}
- leave_ctx(exec_ctx);
-
if (need_to_unref_constructed) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, constructed, "index_register");
}
@@ -218,27 +197,26 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx,
void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *key,
grpc_subchannel *constructed) {
- enter_ctx(exec_ctx);
-
bool done = false;
while (!done) {
// Compare and swap loop:
// - take a reference to the current index
gpr_mu_lock(&g_mu);
- gpr_avl index = gpr_avl_ref(g_subchannel_index);
+ gpr_avl index = gpr_avl_ref(g_subchannel_index, exec_ctx);
gpr_mu_unlock(&g_mu);
// Check to see if this key still refers to the previously
// registered subchannel
- grpc_subchannel *c = gpr_avl_get(index, key);
+ grpc_subchannel *c = gpr_avl_get(index, key, exec_ctx);
if (c != constructed) {
- gpr_avl_unref(index);
+ gpr_avl_unref(index, exec_ctx);
break;
}
// compare and swap the update (some other thread may have
// mutated the index behind us)
- gpr_avl updated = gpr_avl_remove(gpr_avl_ref(index), key);
+ gpr_avl updated =
+ gpr_avl_remove(gpr_avl_ref(index, exec_ctx), key, exec_ctx);
gpr_mu_lock(&g_mu);
if (index.root == g_subchannel_index.root) {
@@ -247,11 +225,9 @@ void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx,
}
gpr_mu_unlock(&g_mu);
- gpr_avl_unref(updated);
- gpr_avl_unref(index);
+ gpr_avl_unref(updated, exec_ctx);
+ gpr_avl_unref(index, exec_ctx);
}
-
- leave_ctx(exec_ctx);
}
void grpc_subchannel_index_test_only_set_force_creation(bool force_creation) {
diff --git a/src/core/ext/filters/http/client/http_client_filter.c b/src/core/ext/filters/http/client/http_client_filter.c
index 90f0aed7a0..3ca01a41b5 100644
--- a/src/core/ext/filters/http/client/http_client_filter.c
+++ b/src/core/ext/filters/http/client/http_client_filter.c
@@ -36,41 +36,29 @@
static const size_t kMaxPayloadSizeForGet = 2048;
typedef struct call_data {
+ // State for handling send_initial_metadata ops.
grpc_linked_mdelem method;
grpc_linked_mdelem scheme;
grpc_linked_mdelem authority;
grpc_linked_mdelem te_trailers;
grpc_linked_mdelem content_type;
grpc_linked_mdelem user_agent;
-
+ // State for handling recv_initial_metadata ops.
grpc_metadata_batch *recv_initial_metadata;
+ grpc_closure *original_recv_initial_metadata_ready;
+ grpc_closure recv_initial_metadata_ready;
+ // State for handling recv_trailing_metadata ops.
grpc_metadata_batch *recv_trailing_metadata;
- uint8_t *payload_bytes;
-
- /* Vars to read data off of send_message */
- grpc_transport_stream_op_batch *send_op;
- uint32_t send_length;
- uint32_t send_flags;
- grpc_slice incoming_slice;
- grpc_slice_buffer_stream replacement_stream;
- grpc_slice_buffer slices;
- /* flag that indicates that all slices of send_messages aren't availble */
- bool send_message_blocked;
-
- /** Closure to call when finished with the hc_on_recv hook */
- grpc_closure *on_done_recv_initial_metadata;
- grpc_closure *on_done_recv_trailing_metadata;
- grpc_closure *on_complete;
- grpc_closure *post_send;
-
- /** Receive closures are chained: we inject this closure as the on_done_recv
- up-call on transport_op, and remember to call our on_done_recv member
- after handling it. */
- grpc_closure hc_on_recv_initial_metadata;
- grpc_closure hc_on_recv_trailing_metadata;
- grpc_closure hc_on_complete;
- grpc_closure got_slice;
- grpc_closure send_done;
+ grpc_closure *original_recv_trailing_metadata_on_complete;
+ grpc_closure recv_trailing_metadata_on_complete;
+ // State for handling send_message ops.
+ grpc_transport_stream_op_batch *send_message_batch;
+ size_t send_message_bytes_read;
+ grpc_byte_stream_cache send_message_cache;
+ grpc_caching_byte_stream send_message_caching_stream;
+ grpc_closure on_send_message_next_done;
+ grpc_closure *original_send_message_on_complete;
+ grpc_closure send_message_on_complete;
} call_data;
typedef struct channel_data {
@@ -148,7 +136,7 @@ static grpc_error *client_filter_incoming_metadata(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE;
}
-static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
+static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
void *user_data, grpc_error *error) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
@@ -158,11 +146,13 @@ static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
} else {
GRPC_ERROR_REF(error);
}
- GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_initial_metadata, error);
+ GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready,
+ error);
}
-static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
- void *user_data, grpc_error *error) {
+static void recv_trailing_metadata_on_complete(grpc_exec_ctx *exec_ctx,
+ void *user_data,
+ grpc_error *error) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (error == GRPC_ERROR_NONE) {
@@ -171,25 +161,131 @@ static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
} else {
GRPC_ERROR_REF(error);
}
- GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_trailing_metadata, error);
+ GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_trailing_metadata_on_complete,
+ error);
}
-static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_error *error) {
- grpc_call_element *elem = user_data;
- call_data *calld = elem->call_data;
- if (calld->payload_bytes) {
- gpr_free(calld->payload_bytes);
- calld->payload_bytes = NULL;
+static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = (grpc_call_element *)arg;
+ call_data *calld = (call_data *)elem->call_data;
+ grpc_byte_stream_cache_destroy(exec_ctx, &calld->send_message_cache);
+ GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete,
+ GRPC_ERROR_REF(error));
+}
+
+// Pulls a slice from the send_message byte stream, updating
+// calld->send_message_bytes_read.
+static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx,
+ call_data *calld) {
+ grpc_slice incoming_slice;
+ grpc_error *error = grpc_byte_stream_pull(
+ exec_ctx, &calld->send_message_caching_stream.base, &incoming_slice);
+ if (error == GRPC_ERROR_NONE) {
+ calld->send_message_bytes_read += GRPC_SLICE_LENGTH(incoming_slice);
+ grpc_slice_unref_internal(exec_ctx, incoming_slice);
}
- calld->on_complete->cb(exec_ctx, calld->on_complete->cb_arg, error);
+ return error;
}
-static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
- grpc_call_element *elem = elemp;
- call_data *calld = elem->call_data;
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices);
- calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error);
+// Reads as many slices as possible from the send_message byte stream.
+// Upon successful return, if calld->send_message_bytes_read ==
+// calld->send_message_caching_stream.base.length, then we have completed
+// reading from the byte stream; otherwise, an async read has been dispatched
+// and on_send_message_next_done() will be invoked when it is complete.
+static grpc_error *read_all_available_send_message_data(grpc_exec_ctx *exec_ctx,
+ call_data *calld) {
+ while (grpc_byte_stream_next(exec_ctx,
+ &calld->send_message_caching_stream.base,
+ ~(size_t)0, &calld->on_send_message_next_done)) {
+ grpc_error *error = pull_slice_from_send_message(exec_ctx, calld);
+ if (error != GRPC_ERROR_NONE) return error;
+ if (calld->send_message_bytes_read ==
+ calld->send_message_caching_stream.base.length) {
+ break;
+ }
+ }
+ return GRPC_ERROR_NONE;
+}
+
+// Async callback for grpc_byte_stream_next().
+static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = (grpc_call_element *)arg;
+ call_data *calld = (call_data *)elem->call_data;
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, calld->send_message_batch, error);
+ return;
+ }
+ error = pull_slice_from_send_message(exec_ctx, calld);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, calld->send_message_batch, error);
+ return;
+ }
+ // There may or may not be more to read, but we don't care. If we got
+ // here, then we know that all of the data was not available
+ // synchronously, so we were not able to do a cached call. Instead,
+ // we just reset the byte stream and then send down the batch as-is.
+ grpc_caching_byte_stream_reset(&calld->send_message_caching_stream);
+ grpc_call_next_op(exec_ctx, elem, calld->send_message_batch);
+}
+
+static char *slice_buffer_to_string(grpc_slice_buffer *slice_buffer) {
+ char *payload_bytes = gpr_malloc(slice_buffer->length + 1);
+ size_t offset = 0;
+ for (size_t i = 0; i < slice_buffer->count; ++i) {
+ memcpy(payload_bytes + offset,
+ GRPC_SLICE_START_PTR(slice_buffer->slices[i]),
+ GRPC_SLICE_LENGTH(slice_buffer->slices[i]));
+ offset += GRPC_SLICE_LENGTH(slice_buffer->slices[i]);
+ }
+ *(payload_bytes + offset) = '\0';
+ return payload_bytes;
+}
+
+// Modifies the path entry in the batch's send_initial_metadata to
+// append the base64-encoded query for a GET request.
+static grpc_error *update_path_for_get(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op_batch *batch) {
+ call_data *calld = (call_data *)elem->call_data;
+ grpc_slice path_slice =
+ GRPC_MDVALUE(batch->payload->send_initial_metadata.send_initial_metadata
+ ->idx.named.path->md);
+ /* sum up individual component's lengths and allocate enough memory to
+ * hold combined path+query */
+ size_t estimated_len = GRPC_SLICE_LENGTH(path_slice);
+ estimated_len++; /* for the '?' */
+ estimated_len += grpc_base64_estimate_encoded_size(
+ batch->payload->send_message.send_message->length, true /* url_safe */,
+ false /* multi_line */);
+ grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len);
+ /* memcopy individual pieces into this slice */
+ char *write_ptr = (char *)GRPC_SLICE_START_PTR(path_with_query_slice);
+ char *original_path = (char *)GRPC_SLICE_START_PTR(path_slice);
+ memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice));
+ write_ptr += GRPC_SLICE_LENGTH(path_slice);
+ *write_ptr++ = '?';
+ char *payload_bytes =
+ slice_buffer_to_string(&calld->send_message_cache.cache_buffer);
+ grpc_base64_encode_core((char *)write_ptr, payload_bytes,
+ batch->payload->send_message.send_message->length,
+ true /* url_safe */, false /* multi_line */);
+ gpr_free(payload_bytes);
+ /* remove trailing unused memory and add trailing 0 to terminate string */
+ char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice);
+ /* safe to use strlen since base64_encode will always add '\0' */
+ path_with_query_slice =
+ grpc_slice_sub_no_ref(path_with_query_slice, 0, strlen(t));
+ /* substitute previous path with the new path+query */
+ grpc_mdelem mdelem_path_and_query =
+ grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice);
+ grpc_metadata_batch *b =
+ batch->payload->send_initial_metadata.send_initial_metadata;
+ return grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path,
+ mdelem_path_and_query);
}
static void remove_if_present(grpc_exec_ctx *exec_ctx,
@@ -200,273 +296,153 @@ static void remove_if_present(grpc_exec_ctx *exec_ctx,
}
}
-static void continue_send_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
+static void hc_start_transport_stream_op_batch(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
- uint8_t *wrptr = calld->payload_bytes;
- while (grpc_byte_stream_next(
- exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
- &calld->got_slice)) {
- grpc_byte_stream_pull(exec_ctx,
- calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice);
- if (GRPC_SLICE_LENGTH(calld->incoming_slice) > 0) {
- memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice),
- GRPC_SLICE_LENGTH(calld->incoming_slice));
- }
- wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice);
- grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
- if (calld->send_length == calld->slices.length) {
- calld->send_message_blocked = false;
- break;
- }
- }
-}
+ channel_data *channeld = elem->channel_data;
+ GPR_TIMER_BEGIN("hc_start_transport_stream_op_batch", 0);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
-static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
- grpc_call_element *elem = elemp;
- call_data *calld = elem->call_data;
- calld->send_message_blocked = false;
- if (GRPC_ERROR_NONE !=
- grpc_byte_stream_pull(exec_ctx,
- calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice)) {
- /* Should never reach here */
- abort();
- }
- grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
- if (calld->send_length == calld->slices.length) {
- /* Pass down the original send_message op that was blocked.*/
- grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
- calld->send_flags);
- calld->send_op->payload->send_message.send_message =
- &calld->replacement_stream.base;
- calld->post_send = calld->send_op->on_complete;
- calld->send_op->on_complete = &calld->send_done;
- grpc_call_next_op(exec_ctx, elem, calld->send_op);
- } else {
- continue_send_message(exec_ctx, elem);
+ if (batch->recv_initial_metadata) {
+ /* substitute our callback for the higher callback */
+ calld->recv_initial_metadata =
+ batch->payload->recv_initial_metadata.recv_initial_metadata;
+ calld->original_recv_initial_metadata_ready =
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
+ &calld->recv_initial_metadata_ready;
}
-}
-static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
- grpc_error *error;
+ if (batch->recv_trailing_metadata) {
+ /* substitute our callback for the higher callback */
+ calld->recv_trailing_metadata =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata;
+ calld->original_recv_trailing_metadata_on_complete = batch->on_complete;
+ batch->on_complete = &calld->recv_trailing_metadata_on_complete;
+ }
- if (op->send_initial_metadata) {
- /* Decide which HTTP VERB to use. We use GET if the request is marked
- cacheable, and the operation contains both initial metadata and send
- message, and the payload is below the size threshold, and all the data
- for this request is immediately available. */
+ grpc_error *error = GRPC_ERROR_NONE;
+ bool batch_will_be_handled_asynchronously = false;
+ if (batch->send_initial_metadata) {
+ // Decide which HTTP VERB to use. We use GET if the request is marked
+ // cacheable, and the operation contains both initial metadata and send
+ // message, and the payload is below the size threshold, and all the data
+ // for this request is immediately available.
grpc_mdelem method = GRPC_MDELEM_METHOD_POST;
- if (op->send_message &&
- (op->payload->send_initial_metadata.send_initial_metadata_flags &
+ if (batch->send_message &&
+ (batch->payload->send_initial_metadata.send_initial_metadata_flags &
GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) &&
- op->payload->send_message.send_message->length <
+ batch->payload->send_message.send_message->length <
channeld->max_payload_size_for_get) {
- method = GRPC_MDELEM_METHOD_GET;
- /* The following write to calld->send_message_blocked isn't racy with
- reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because
- being here means ops->send_message is not NULL, which is primarily
- guarding the read there. */
- calld->send_message_blocked = true;
- } else if (op->payload->send_initial_metadata.send_initial_metadata_flags &
- GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
- method = GRPC_MDELEM_METHOD_PUT;
- }
-
- /* Attempt to read the data from send_message and create a header field. */
- if (grpc_mdelem_eq(method, GRPC_MDELEM_METHOD_GET)) {
- /* allocate memory to hold the entire payload */
- calld->payload_bytes =
- gpr_malloc(op->payload->send_message.send_message->length);
-
- /* read slices of send_message and copy into payload_bytes */
- calld->send_op = op;
- calld->send_length = op->payload->send_message.send_message->length;
- calld->send_flags = op->payload->send_message.send_message->flags;
- continue_send_message(exec_ctx, elem);
-
- if (calld->send_message_blocked == false) {
- /* when all the send_message data is available, then modify the path
- * MDELEM by appending base64 encoded query to the path */
- const int k_url_safe = 1;
- const int k_multi_line = 0;
- const unsigned char k_query_separator = '?';
-
- grpc_slice path_slice =
- GRPC_MDVALUE(op->payload->send_initial_metadata
- .send_initial_metadata->idx.named.path->md);
- /* sum up individual component's lengths and allocate enough memory to
- * hold combined path+query */
- size_t estimated_len = GRPC_SLICE_LENGTH(path_slice);
- estimated_len++; /* for the '?' */
- estimated_len += grpc_base64_estimate_encoded_size(
- op->payload->send_message.send_message->length, k_url_safe,
- k_multi_line);
- grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len);
-
- /* memcopy individual pieces into this slice */
- uint8_t *write_ptr =
- (uint8_t *)GRPC_SLICE_START_PTR(path_with_query_slice);
- uint8_t *original_path = (uint8_t *)GRPC_SLICE_START_PTR(path_slice);
- memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice));
- write_ptr += GRPC_SLICE_LENGTH(path_slice);
-
- *write_ptr = k_query_separator;
- write_ptr++; /* for the '?' */
-
- grpc_base64_encode_core((char *)write_ptr, calld->payload_bytes,
- op->payload->send_message.send_message->length,
- k_url_safe, k_multi_line);
-
- /* remove trailing unused memory and add trailing 0 to terminate string
- */
- char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice);
- /* safe to use strlen since base64_encode will always add '\0' */
- path_with_query_slice =
- grpc_slice_sub_no_ref(path_with_query_slice, 0, strlen(t));
-
- /* substitute previous path with the new path+query */
- grpc_mdelem mdelem_path_and_query = grpc_mdelem_from_slices(
- exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice);
- grpc_metadata_batch *b =
- op->payload->send_initial_metadata.send_initial_metadata;
- error = grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path,
- mdelem_path_and_query);
- if (error != GRPC_ERROR_NONE) return error;
-
- calld->on_complete = op->on_complete;
- op->on_complete = &calld->hc_on_complete;
- op->send_message = false;
+ calld->send_message_bytes_read = 0;
+ grpc_byte_stream_cache_init(&calld->send_message_cache,
+ batch->payload->send_message.send_message);
+ grpc_caching_byte_stream_init(&calld->send_message_caching_stream,
+ &calld->send_message_cache);
+ batch->payload->send_message.send_message =
+ &calld->send_message_caching_stream.base;
+ calld->original_send_message_on_complete = batch->on_complete;
+ batch->on_complete = &calld->send_message_on_complete;
+ calld->send_message_batch = batch;
+ error = read_all_available_send_message_data(exec_ctx, calld);
+ if (error != GRPC_ERROR_NONE) goto done;
+ // If all the data has been read, then we can use GET.
+ if (calld->send_message_bytes_read ==
+ calld->send_message_caching_stream.base.length) {
+ method = GRPC_MDELEM_METHOD_GET;
+ error = update_path_for_get(exec_ctx, elem, batch);
+ if (error != GRPC_ERROR_NONE) goto done;
+ batch->send_message = false;
+ grpc_byte_stream_destroy(exec_ctx,
+ &calld->send_message_caching_stream.base);
} else {
- /* Not all data is available. Fall back to POST. */
+ // Not all data is available. The batch will be sent down
+ // asynchronously in on_send_message_next_done().
+ batch_will_be_handled_asynchronously = true;
+ // Fall back to POST.
gpr_log(GPR_DEBUG,
- "Request is marked Cacheable but not all data is available.\
- Falling back to POST");
- method = GRPC_MDELEM_METHOD_POST;
+ "Request is marked Cacheable but not all data is available. "
+ "Falling back to POST");
}
+ } else if (batch->payload->send_initial_metadata
+ .send_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
+ method = GRPC_MDELEM_METHOD_PUT;
}
- remove_if_present(exec_ctx,
- op->payload->send_initial_metadata.send_initial_metadata,
- GRPC_BATCH_METHOD);
- remove_if_present(exec_ctx,
- op->payload->send_initial_metadata.send_initial_metadata,
- GRPC_BATCH_SCHEME);
- remove_if_present(exec_ctx,
- op->payload->send_initial_metadata.send_initial_metadata,
- GRPC_BATCH_TE);
- remove_if_present(exec_ctx,
- op->payload->send_initial_metadata.send_initial_metadata,
- GRPC_BATCH_CONTENT_TYPE);
- remove_if_present(exec_ctx,
- op->payload->send_initial_metadata.send_initial_metadata,
- GRPC_BATCH_USER_AGENT);
+ remove_if_present(
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_METHOD);
+ remove_if_present(
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_SCHEME);
+ remove_if_present(
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_TE);
+ remove_if_present(
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_CONTENT_TYPE);
+ remove_if_present(
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_USER_AGENT);
/* Send : prefixed headers, which have to be before any application
layer headers. */
error = grpc_metadata_batch_add_head(
- exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
&calld->method, method);
- if (error != GRPC_ERROR_NONE) return error;
+ if (error != GRPC_ERROR_NONE) goto done;
error = grpc_metadata_batch_add_head(
- exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
&calld->scheme, channeld->static_scheme);
- if (error != GRPC_ERROR_NONE) return error;
+ if (error != GRPC_ERROR_NONE) goto done;
error = grpc_metadata_batch_add_tail(
- exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
&calld->te_trailers, GRPC_MDELEM_TE_TRAILERS);
- if (error != GRPC_ERROR_NONE) return error;
+ if (error != GRPC_ERROR_NONE) goto done;
error = grpc_metadata_batch_add_tail(
- exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
&calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC);
- if (error != GRPC_ERROR_NONE) return error;
+ if (error != GRPC_ERROR_NONE) goto done;
error = grpc_metadata_batch_add_tail(
- exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
&calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent));
- if (error != GRPC_ERROR_NONE) return error;
+ if (error != GRPC_ERROR_NONE) goto done;
}
- if (op->recv_initial_metadata) {
- /* substitute our callback for the higher callback */
- calld->recv_initial_metadata =
- op->payload->recv_initial_metadata.recv_initial_metadata;
- calld->on_done_recv_initial_metadata =
- op->payload->recv_initial_metadata.recv_initial_metadata_ready;
- op->payload->recv_initial_metadata.recv_initial_metadata_ready =
- &calld->hc_on_recv_initial_metadata;
- }
-
- if (op->recv_trailing_metadata) {
- /* substitute our callback for the higher callback */
- calld->recv_trailing_metadata =
- op->payload->recv_trailing_metadata.recv_trailing_metadata;
- calld->on_done_recv_trailing_metadata = op->on_complete;
- op->on_complete = &calld->hc_on_recv_trailing_metadata;
- }
-
- return GRPC_ERROR_NONE;
-}
-
-static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- GPR_TIMER_BEGIN("hc_start_transport_op", 0);
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- grpc_error *error = hc_mutate_op(exec_ctx, elem, op);
+done:
if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
- } else {
- call_data *calld = elem->call_data;
- if (op->send_message && calld->send_message_blocked) {
- /* Don't forward the op. send_message contains slices that aren't ready
- yet. The call will be forwarded by the op_complete of slice read call.
- */
- } else {
- grpc_call_next_op(exec_ctx, elem, op);
- }
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, calld->send_message_batch, error);
+ } else if (!batch_will_be_handled_asynchronously) {
+ grpc_call_next_op(exec_ctx, elem, batch);
}
- GPR_TIMER_END("hc_start_transport_op", 0);
+ GPR_TIMER_END("hc_start_transport_stream_op_batch", 0);
}
/* Constructor for call_data */
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
- call_data *calld = elem->call_data;
- calld->on_done_recv_initial_metadata = NULL;
- calld->on_done_recv_trailing_metadata = NULL;
- calld->on_complete = NULL;
- calld->payload_bytes = NULL;
- calld->send_message_blocked = false;
- grpc_slice_buffer_init(&calld->slices);
- GRPC_CLOSURE_INIT(&calld->hc_on_recv_initial_metadata,
- hc_on_recv_initial_metadata, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->hc_on_recv_trailing_metadata,
- hc_on_recv_trailing_metadata, elem,
+ call_data *calld = (call_data *)elem->call_data;
+ GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
+ recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->hc_on_complete, hc_on_complete, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->got_slice, got_slice, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->send_done, send_done, elem,
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_on_complete,
+ recv_trailing_metadata_on_complete, elem,
grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
+ elem, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&calld->on_send_message_next_done,
+ on_send_message_next_done, elem, grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
- grpc_closure *ignored) {
- call_data *calld = elem->call_data;
- grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices);
-}
+ grpc_closure *ignored) {}
static grpc_mdelem scheme_from_args(const grpc_channel_args *args) {
unsigned i;
@@ -580,7 +556,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
const grpc_channel_filter grpc_http_client_filter = {
- hc_start_transport_op,
+ hc_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c
index 71a8bc5bec..20a3488115 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.c
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c
@@ -61,14 +61,11 @@ typedef struct call_data {
pointer | CANCELLED_BIT - request was cancelled with error pointed to */
gpr_atm send_initial_metadata_state;
- grpc_transport_stream_op_batch *send_op;
- uint32_t send_length;
- uint32_t send_flags;
- grpc_slice incoming_slice;
+ grpc_transport_stream_op_batch *send_message_batch;
grpc_slice_buffer_stream replacement_stream;
- grpc_closure *post_send;
- grpc_closure send_done;
- grpc_closure got_slice;
+ grpc_closure *original_send_message_on_complete;
+ grpc_closure send_message_on_complete;
+ grpc_closure on_send_message_next_done;
} call_data;
typedef struct channel_data {
@@ -164,24 +161,25 @@ static grpc_error *process_send_initial_metadata(
return error;
}
-static void continue_send_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem);
-
-static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
- grpc_call_element *elem = elemp;
- call_data *calld = elem->call_data;
+static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = (grpc_call_element *)arg;
+ call_data *calld = (call_data *)elem->call_data;
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices);
- calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error);
+ GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete,
+ GRPC_ERROR_REF(error));
}
static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- int did_compress;
+ call_data *calld = (call_data *)elem->call_data;
+ // Compress the data if appropriate.
grpc_slice_buffer tmp;
grpc_slice_buffer_init(&tmp);
- did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm,
- &calld->slices, &tmp);
+ uint32_t send_flags =
+ calld->send_message_batch->payload->send_message.send_message->flags;
+ const bool did_compress = grpc_msg_compress(
+ exec_ctx, calld->compression_algorithm, &calld->slices, &tmp);
if (did_compress) {
if (GRPC_TRACER_ON(grpc_compression_trace)) {
char *algo_name;
@@ -195,7 +193,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
algo_name, before_size, after_size, 100 * savings_ratio);
}
grpc_slice_buffer_swap(&calld->slices, &tmp);
- calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
} else {
if (GRPC_TRACER_ON(grpc_compression_trace)) {
char *algo_name;
@@ -207,83 +205,118 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
algo_name, calld->slices.length);
}
}
-
grpc_slice_buffer_destroy_internal(exec_ctx, &tmp);
-
+ // Swap out the original byte stream with our new one and send the
+ // batch down.
+ grpc_byte_stream_destroy(
+ exec_ctx, calld->send_message_batch->payload->send_message.send_message);
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
- calld->send_flags);
- calld->send_op->payload->send_message.send_message =
+ send_flags);
+ calld->send_message_batch->payload->send_message.send_message =
&calld->replacement_stream.base;
- calld->post_send = calld->send_op->on_complete;
- calld->send_op->on_complete = &calld->send_done;
-
- grpc_call_next_op(exec_ctx, elem, calld->send_op);
+ calld->original_send_message_on_complete =
+ calld->send_message_batch->on_complete;
+ calld->send_message_batch->on_complete = &calld->send_message_on_complete;
+ grpc_call_next_op(exec_ctx, elem, calld->send_message_batch);
}
-static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
- grpc_call_element *elem = elemp;
- call_data *calld = elem->call_data;
- if (GRPC_ERROR_NONE !=
- grpc_byte_stream_pull(exec_ctx,
- calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice)) {
- /* Should never reach here */
- abort();
- }
- grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
- if (calld->send_length == calld->slices.length) {
- finish_send_message(exec_ctx, elem);
- } else {
- continue_send_message(exec_ctx, elem);
+// Pulls a slice from the send_message byte stream and adds it to calld->slices.
+static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx,
+ call_data *calld) {
+ grpc_slice incoming_slice;
+ grpc_error *error = grpc_byte_stream_pull(
+ exec_ctx, calld->send_message_batch->payload->send_message.send_message,
+ &incoming_slice);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_slice_buffer_add(&calld->slices, incoming_slice);
}
+ return error;
}
-static void continue_send_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
- call_data *calld = elem->call_data;
+// Reads as many slices as possible from the send_message byte stream.
+// If all data has been read, invokes finish_send_message(). Otherwise,
+// an async call to grpc_byte_stream_next() has been started, which will
+// eventually result in calling on_send_message_next_done().
+static grpc_error *continue_reading_send_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ call_data *calld = (call_data *)elem->call_data;
while (grpc_byte_stream_next(
- exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
- &calld->got_slice)) {
- grpc_byte_stream_pull(exec_ctx,
- calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice);
- grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
- if (calld->send_length == calld->slices.length) {
+ exec_ctx, calld->send_message_batch->payload->send_message.send_message,
+ ~(size_t)0, &calld->on_send_message_next_done)) {
+ grpc_error *error = pull_slice_from_send_message(exec_ctx, calld);
+ if (error != GRPC_ERROR_NONE) return error;
+ if (calld->slices.length ==
+ calld->send_message_batch->payload->send_message.send_message->length) {
finish_send_message(exec_ctx, elem);
break;
}
}
+ return GRPC_ERROR_NONE;
}
-static void handle_send_message_batch(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op,
- bool has_compression_algorithm) {
- call_data *calld = elem->call_data;
- if (!skip_compression(elem, op->payload->send_message.send_message->flags,
+// Async callback for grpc_byte_stream_next().
+static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = (grpc_call_element *)arg;
+ call_data *calld = (call_data *)elem->call_data;
+ if (error != GRPC_ERROR_NONE) goto fail;
+ error = pull_slice_from_send_message(exec_ctx, calld);
+ if (error != GRPC_ERROR_NONE) goto fail;
+ if (calld->slices.length ==
+ calld->send_message_batch->payload->send_message.send_message->length) {
+ finish_send_message(exec_ctx, elem);
+ } else {
+ // This will either finish reading all of the data and invoke
+ // finish_send_message(), or else it will make an async call to
+ // grpc_byte_stream_next(), which will eventually result in calling
+ // this function again.
+ error = continue_reading_send_message(exec_ctx, elem);
+ if (error != GRPC_ERROR_NONE) goto fail;
+ }
+ return;
+fail:
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, calld->send_message_batch, error);
+}
+
+static void start_send_message_batch(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op_batch *batch,
+ bool has_compression_algorithm) {
+ call_data *calld = (call_data *)elem->call_data;
+ if (!skip_compression(elem, batch->payload->send_message.send_message->flags,
has_compression_algorithm)) {
- calld->send_op = op;
- calld->send_length = op->payload->send_message.send_message->length;
- calld->send_flags = op->payload->send_message.send_message->flags;
- continue_send_message(exec_ctx, elem);
+ calld->send_message_batch = batch;
+ // This will either finish reading all of the data and invoke
+ // finish_send_message(), or else it will make an async call to
+ // grpc_byte_stream_next(), which will eventually result in calling
+ // on_send_message_next_done().
+ grpc_error *error = continue_reading_send_message(exec_ctx, elem);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, calld->send_message_batch, error);
+ }
} else {
/* pass control down the stack */
- grpc_call_next_op(exec_ctx, elem, op);
+ grpc_call_next_op(exec_ctx, elem, batch);
}
}
static void compress_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
+ grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0);
- if (op->cancel_stream) {
- GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error);
+ if (batch->cancel_stream) {
+ // TODO(roth): As part of the upcoming call combiner work, change
+ // this to call grpc_byte_stream_shutdown() on the incoming byte
+ // stream, to cancel any in-flight calls to grpc_byte_stream_next().
+ GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
gpr_atm cur = gpr_atm_full_xchg(
&calld->send_initial_metadata_state,
- CANCELLED_BIT | (gpr_atm)op->payload->cancel_stream.cancel_error);
+ CANCELLED_BIT | (gpr_atm)batch->payload->cancel_stream.cancel_error);
switch (cur) {
case HAS_COMPRESSION_ALGORITHM:
case NO_COMPRESSION_ALGORITHM:
@@ -293,7 +326,7 @@ static void compress_start_transport_stream_op_batch(
if ((cur & CANCELLED_BIT) == 0) {
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, (grpc_transport_stream_op_batch *)cur,
- GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
+ GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error));
} else {
GRPC_ERROR_UNREF((grpc_error *)(cur & ~CANCELLED_BIT));
}
@@ -301,14 +334,15 @@ static void compress_start_transport_stream_op_batch(
}
}
- if (op->send_initial_metadata) {
+ if (batch->send_initial_metadata) {
bool has_compression_algorithm;
grpc_error *error = process_send_initial_metadata(
exec_ctx, elem,
- op->payload->send_initial_metadata.send_initial_metadata,
+ batch->payload->send_initial_metadata.send_initial_metadata,
&has_compression_algorithm);
if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
+ error);
return;
}
gpr_atm cur;
@@ -324,32 +358,32 @@ static void compress_start_transport_stream_op_batch(
goto retry_send_im;
}
if (cur != INITIAL_METADATA_UNSEEN) {
- handle_send_message_batch(exec_ctx, elem,
- (grpc_transport_stream_op_batch *)cur,
- has_compression_algorithm);
+ start_send_message_batch(exec_ctx, elem,
+ (grpc_transport_stream_op_batch *)cur,
+ has_compression_algorithm);
}
}
}
- if (op->send_message) {
+ if (batch->send_message) {
gpr_atm cur;
retry_send:
cur = gpr_atm_acq_load(&calld->send_initial_metadata_state);
switch (cur) {
case INITIAL_METADATA_UNSEEN:
if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur,
- (gpr_atm)op)) {
+ (gpr_atm)batch)) {
goto retry_send;
}
break;
case HAS_COMPRESSION_ALGORITHM:
case NO_COMPRESSION_ALGORITHM:
- handle_send_message_batch(exec_ctx, elem, op,
- cur == HAS_COMPRESSION_ALGORITHM);
+ start_send_message_batch(exec_ctx, elem, batch,
+ cur == HAS_COMPRESSION_ALGORITHM);
break;
default:
if (cur & CANCELLED_BIT) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op,
+ exec_ctx, batch,
GRPC_ERROR_REF((grpc_error *)(cur & ~CANCELLED_BIT)));
} else {
/* >1 send_message concurrently */
@@ -358,7 +392,7 @@ static void compress_start_transport_stream_op_batch(
}
} else {
/* pass control down the stack */
- grpc_call_next_op(exec_ctx, elem, op);
+ grpc_call_next_op(exec_ctx, elem, batch);
}
GPR_TIMER_END("compress_start_transport_stream_op_batch", 0);
@@ -373,10 +407,10 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* initialize members */
grpc_slice_buffer_init(&calld->slices);
- GRPC_CLOSURE_INIT(&calld->got_slice, got_slice, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->send_done, send_done, elem,
- grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&calld->on_send_message_next_done,
+ on_send_message_next_done, elem, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
+ elem, grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 731ebf400f..dc35f4855c 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -730,6 +730,14 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_slice_buffer_destroy_internal(exec_ctx,
&s->unprocessed_incoming_frames_buffer);
grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage);
+ if (s->compressed_data_buffer) {
+ grpc_slice_buffer_destroy_internal(exec_ctx, s->compressed_data_buffer);
+ gpr_free(s->compressed_data_buffer);
+ }
+ if (s->decompressed_data_buffer) {
+ grpc_slice_buffer_destroy_internal(exec_ctx, s->decompressed_data_buffer);
+ gpr_free(s->decompressed_data_buffer);
+ }
grpc_chttp2_list_remove_stalled_by_transport(t, s);
grpc_chttp2_list_remove_stalled_by_stream(t, s);
@@ -780,6 +788,15 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
+ if (s->stream_compression_ctx != NULL) {
+ grpc_stream_compression_context_destroy(s->stream_compression_ctx);
+ s->stream_compression_ctx = NULL;
+ }
+ if (s->stream_decompression_ctx != NULL) {
+ grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
+ s->stream_decompression_ctx = NULL;
+ }
+
s->destroy_stream_arg = then_schedule_closure;
GRPC_CLOSURE_SCHED(
exec_ctx, GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s,
@@ -1173,6 +1190,7 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
return; /* early out */
}
if (s->fetched_send_message_length == s->fetching_send_message->length) {
+ grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message);
int64_t notify_offset = s->next_message_end_offset;
if (notify_offset <= s->flow_controlled_bytes_written) {
grpc_chttp2_complete_closure_step(
@@ -1195,9 +1213,14 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
return; /* early out */
} else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message,
UINT32_MAX, &s->complete_fetch_locked)) {
- grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
- &s->fetching_slice);
- add_fetched_slice_locked(exec_ctx, t, s);
+ grpc_error *error = grpc_byte_stream_pull(
+ exec_ctx, s->fetching_send_message, &s->fetching_slice);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message);
+ grpc_chttp2_cancel_stream(exec_ctx, t, s, error);
+ } else {
+ add_fetched_slice_locked(exec_ctx, t, s);
+ }
}
}
}
@@ -1214,10 +1237,9 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
continue_fetching_send_locked(exec_ctx, t, s);
}
}
-
if (error != GRPC_ERROR_NONE) {
- /* TODO(ctiller): what to do here */
- abort();
+ grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message);
+ grpc_chttp2_cancel_stream(exec_ctx, t, s, error);
}
}
@@ -1362,8 +1384,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
"fetching_send_message_finished");
} else {
GPR_ASSERT(s->fetching_send_message == NULL);
- uint8_t *frame_hdr =
- grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5);
+ uint8_t *frame_hdr = grpc_slice_buffer_tiny_add(
+ &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
uint32_t flags = op_payload->send_message.send_message->flags;
frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
size_t len = op_payload->send_message.send_message->length;
@@ -1454,14 +1476,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
s->recv_message_ready = op_payload->recv_message.recv_message_ready;
s->recv_message = op_payload->recv_message.recv_message;
if (s->id != 0) {
- if (s->pending_byte_stream) {
- already_received = s->frame_storage.length;
- } else {
- already_received = s->frame_storage.length +
- s->unprocessed_incoming_frames_buffer.length;
- }
- incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5,
- already_received);
+ already_received = s->frame_storage.length;
+ incoming_byte_stream_update_flow_control(
+ exec_ctx, t, s, GRPC_HEADER_SIZE_IN_BYTES, already_received);
}
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
@@ -1698,10 +1715,43 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx,
if (s->unprocessed_incoming_frames_buffer.length == 0) {
grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
&s->frame_storage);
+ s->unprocessed_incoming_frames_decompressed = false;
+ }
+ if (s->stream_compression_recv_enabled &&
+ !s->unprocessed_incoming_frames_decompressed) {
+ GPR_ASSERT(s->decompressed_data_buffer->length == 0);
+ bool end_of_context;
+ if (!s->stream_decompression_ctx) {
+ s->stream_decompression_ctx =
+ grpc_stream_compression_context_create(
+ GRPC_STREAM_COMPRESSION_DECOMPRESS);
+ }
+ if (!grpc_stream_decompress(s->stream_decompression_ctx,
+ &s->unprocessed_incoming_frames_buffer,
+ s->decompressed_data_buffer, NULL,
+ GRPC_HEADER_SIZE_IN_BYTES,
+ &end_of_context)) {
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ &s->frame_storage);
+ grpc_slice_buffer_reset_and_unref_internal(
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Stream decompression error.");
+ } else {
+ error = grpc_deframe_unprocessed_incoming_frames(
+ exec_ctx, &s->data_parser, s, s->decompressed_data_buffer, NULL,
+ s->recv_message);
+ if (end_of_context) {
+ grpc_stream_compression_context_destroy(
+ s->stream_decompression_ctx);
+ s->stream_decompression_ctx = NULL;
+ }
+ }
+ } else {
+ error = grpc_deframe_unprocessed_incoming_frames(
+ exec_ctx, &s->data_parser, s,
+ &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message);
}
- error = grpc_deframe_unprocessed_incoming_frames(
- exec_ctx, &s->data_parser, s,
- &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message);
if (error != GRPC_ERROR_NONE) {
s->seen_error = true;
grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
@@ -1739,7 +1789,37 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
}
bool pending_data = s->pending_byte_stream ||
s->unprocessed_incoming_frames_buffer.length > 0;
+ if (s->stream_compression_recv_enabled && s->read_closed &&
+ s->frame_storage.length > 0 &&
+ s->unprocessed_incoming_frames_buffer.length == 0 && !pending_data &&
+ !s->seen_error && s->recv_trailing_metadata_finished != NULL) {
+ /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and
+ * maybe decompress the next 5 bytes in the stream. */
+ bool end_of_context;
+ if (!s->stream_decompression_ctx) {
+ s->stream_decompression_ctx = grpc_stream_compression_context_create(
+ GRPC_STREAM_COMPRESSION_DECOMPRESS);
+ }
+ if (!grpc_stream_decompress(s->stream_decompression_ctx,
+ &s->frame_storage,
+ &s->unprocessed_incoming_frames_buffer, NULL,
+ GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
+ grpc_slice_buffer_reset_and_unref_internal(
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
+ s->seen_error = true;
+ } else {
+ if (s->unprocessed_incoming_frames_buffer.length > 0) {
+ s->unprocessed_incoming_frames_decompressed = true;
+ }
+ if (end_of_context) {
+ grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
+ s->stream_decompression_ctx = NULL;
+ }
+ }
+ }
if (s->read_closed && s->frame_storage.length == 0 &&
+ s->unprocessed_incoming_frames_buffer.length == 0 &&
(!pending_data || s->seen_error) &&
s->recv_trailing_metadata_finished != NULL) {
grpc_chttp2_incoming_metadata_buffer_publish(
@@ -2607,6 +2687,7 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
if (s->frame_storage.length > 0) {
grpc_slice_buffer_swap(&s->frame_storage,
&s->unprocessed_incoming_frames_buffer);
+ s->unprocessed_incoming_frames_decompressed = false;
GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
} else if (s->byte_stream_error != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete,
@@ -2668,17 +2749,41 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
grpc_chttp2_stream *s = bs->stream;
+ grpc_error *error;
if (s->unprocessed_incoming_frames_buffer.length > 0) {
- grpc_error *error = grpc_deframe_unprocessed_incoming_frames(
+ if (s->stream_compression_recv_enabled &&
+ !s->unprocessed_incoming_frames_decompressed) {
+ bool end_of_context;
+ if (!s->stream_decompression_ctx) {
+ s->stream_decompression_ctx = grpc_stream_compression_context_create(
+ GRPC_STREAM_COMPRESSION_DECOMPRESS);
+ }
+ if (!grpc_stream_decompress(s->stream_decompression_ctx,
+ &s->unprocessed_incoming_frames_buffer,
+ s->decompressed_data_buffer, NULL, MAX_SIZE_T,
+ &end_of_context)) {
+ error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
+ return error;
+ }
+ GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
+ grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
+ s->decompressed_data_buffer);
+ s->unprocessed_incoming_frames_decompressed = true;
+ if (end_of_context) {
+ grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
+ s->stream_decompression_ctx = NULL;
+ }
+ }
+ error = grpc_deframe_unprocessed_incoming_frames(
exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
slice, NULL);
if (error != GRPC_ERROR_NONE) {
return error;
}
} else {
- grpc_error *error =
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
return error;
}
@@ -2686,22 +2791,9 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE;
}
-static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream);
-
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream,
- grpc_error *error_ignored) {
- grpc_chttp2_incoming_byte_stream *bs = byte_stream;
- grpc_chttp2_stream *s = bs->stream;
- grpc_chttp2_transport *t = s->t;
-
- GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
- incoming_byte_stream_unref(exec_ctx, bs);
- s->pending_byte_stream = false;
- grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
- grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
-}
+ grpc_error *error_ignored);
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream) {
@@ -2768,6 +2860,33 @@ grpc_error *grpc_chttp2_incoming_byte_stream_finished(
return error;
}
+static void incoming_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_error *error) {
+ grpc_chttp2_incoming_byte_stream *bs =
+ (grpc_chttp2_incoming_byte_stream *)byte_stream;
+ GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
+ exec_ctx, bs, error, true /* reset_on_error */));
+}
+
+static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = {
+ incoming_byte_stream_next, incoming_byte_stream_pull,
+ incoming_byte_stream_shutdown, incoming_byte_stream_destroy};
+
+static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
+ void *byte_stream,
+ grpc_error *error_ignored) {
+ grpc_chttp2_incoming_byte_stream *bs = byte_stream;
+ grpc_chttp2_stream *s = bs->stream;
+ grpc_chttp2_transport *t = s->t;
+
+ GPR_ASSERT(bs->base.vtable == &grpc_chttp2_incoming_byte_stream_vtable);
+ incoming_byte_stream_unref(exec_ctx, bs);
+ s->pending_byte_stream = false;
+ grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
+}
+
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
uint32_t frame_size, uint32_t flags) {
@@ -2776,9 +2895,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
incoming_byte_stream->base.length = frame_size;
incoming_byte_stream->remaining_bytes = frame_size;
incoming_byte_stream->base.flags = flags;
- incoming_byte_stream->base.next = incoming_byte_stream_next;
- incoming_byte_stream->base.pull = incoming_byte_stream_pull;
- incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
+ incoming_byte_stream->base.vtable = &grpc_chttp2_incoming_byte_stream_vtable;
gpr_ref_init(&incoming_byte_stream->refs, 2);
incoming_byte_stream->transport = t;
incoming_byte_stream->stream = s;
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c
index dead6be77f..222d2177b2 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.c
+++ b/src/core/ext/transport/chttp2/transport/frame_data.c
@@ -293,7 +293,6 @@ grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
grpc_slice slice, int is_last) {
- /* grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); */
if (!s->pending_byte_stream) {
grpc_slice_ref_internal(slice);
grpc_slice_buffer_add(&s->frame_storage, slice);
@@ -304,6 +303,7 @@ grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
GRPC_CLOSURE_SCHED(exec_ctx, s->on_next, GRPC_ERROR_NONE);
s->on_next = NULL;
+ s->unprocessed_incoming_frames_decompressed = false;
} else {
grpc_slice_ref_internal(slice);
grpc_slice_buffer_add(&s->frame_storage, slice);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 4563b78e75..b538d1df17 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -526,6 +526,26 @@ struct grpc_chttp2_stream {
grpc_chttp2_write_cb *on_write_finished_cbs;
grpc_chttp2_write_cb *finish_after_write;
size_t sending_bytes;
+
+ /** Whether stream compression send is enabled */
+ bool stream_compression_recv_enabled;
+ /** Whether stream compression recv is enabled */
+ bool stream_compression_send_enabled;
+ /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed
+ */
+ bool unprocessed_incoming_frames_decompressed;
+ /** Stream compression decompress context */
+ grpc_stream_compression_context *stream_decompression_ctx;
+ /** Stream compression compress context */
+ grpc_stream_compression_context *stream_compression_ctx;
+
+ /** Buffer storing data that is compressed but not sent */
+ grpc_slice_buffer *compressed_data_buffer;
+ /** Amount of uncompressed bytes sent out when compressed_data_buffer is
+ * emptied */
+ size_t uncompressed_data_size;
+ /** Temporary buffer storing decompressed data */
+ grpc_slice_buffer *decompressed_data_buffer;
};
/** Transport writing call flow:
@@ -621,6 +641,9 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
grpc_closure **pclosure,
grpc_error *error, const char *desc);
+#define GRPC_HEADER_SIZE_IN_BYTES 5
+#define MAX_SIZE_T (~(size_t)0)
+
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
(sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 315f2a67a2..c3ede08343 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -303,7 +303,9 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
}
if (sent_initial_metadata) {
/* send any body bytes, if allowed by flow control */
- if (s->flow_controlled_buffer.length > 0) {
+ if (s->flow_controlled_buffer.length > 0 ||
+ (s->stream_compression_send_enabled &&
+ s->compressed_data_buffer->length > 0)) {
uint32_t stream_outgoing_window = (uint32_t)GPR_MAX(
0,
s->outgoing_window_delta +
@@ -314,21 +316,63 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
GPR_MIN(stream_outgoing_window, t->outgoing_window));
if (max_outgoing > 0) {
- uint32_t send_bytes =
- (uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length);
- bool is_last_data_frame =
- s->fetching_send_message == NULL &&
- send_bytes == s->flow_controlled_buffer.length;
- bool is_last_frame =
- is_last_data_frame && s->send_trailing_metadata != NULL &&
- grpc_metadata_batch_is_empty(s->send_trailing_metadata);
- grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes,
- is_last_frame, &s->stats.outgoing,
- &t->outbuf);
- GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta,
- send_bytes);
- GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
- send_bytes);
+ bool is_last_data_frame = false;
+ bool is_last_frame = false;
+ if (s->stream_compression_send_enabled) {
+ while ((s->flow_controlled_buffer.length > 0 ||
+ s->compressed_data_buffer->length > 0) &&
+ max_outgoing > 0) {
+ if (s->compressed_data_buffer->length > 0) {
+ uint32_t send_bytes = (uint32_t)GPR_MIN(
+ max_outgoing, s->compressed_data_buffer->length);
+ is_last_data_frame =
+ (send_bytes == s->compressed_data_buffer->length &&
+ s->flow_controlled_buffer.length == 0 &&
+ s->fetching_send_message == NULL);
+ is_last_frame =
+ is_last_data_frame && s->send_trailing_metadata != NULL &&
+ grpc_metadata_batch_is_empty(s->send_trailing_metadata);
+ grpc_chttp2_encode_data(s->id, s->compressed_data_buffer,
+ send_bytes, is_last_frame,
+ &s->stats.outgoing, &t->outbuf);
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM(
+ "write", t, s, outgoing_window_delta, send_bytes);
+ GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
+ send_bytes);
+ max_outgoing -= send_bytes;
+ if (s->compressed_data_buffer->length == 0) {
+ s->sending_bytes += s->uncompressed_data_size;
+ }
+ } else {
+ if (s->stream_compression_ctx == NULL) {
+ s->stream_compression_ctx =
+ grpc_stream_compression_context_create(
+ GRPC_STREAM_COMPRESSION_COMPRESS);
+ }
+ s->uncompressed_data_size = s->flow_controlled_buffer.length;
+ GPR_ASSERT(grpc_stream_compress(
+ s->stream_compression_ctx, &s->flow_controlled_buffer,
+ s->compressed_data_buffer, NULL, MAX_SIZE_T,
+ GRPC_STREAM_COMPRESSION_FLUSH_SYNC));
+ }
+ }
+ } else {
+ uint32_t send_bytes = (uint32_t)GPR_MIN(
+ max_outgoing, s->flow_controlled_buffer.length);
+ is_last_data_frame = s->fetching_send_message == NULL &&
+ send_bytes == s->flow_controlled_buffer.length;
+ is_last_frame =
+ is_last_data_frame && s->send_trailing_metadata != NULL &&
+ grpc_metadata_batch_is_empty(s->send_trailing_metadata);
+ grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer,
+ send_bytes, is_last_frame,
+ &s->stats.outgoing, &t->outbuf);
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta,
+ send_bytes);
+ GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
+ send_bytes);
+ s->sending_bytes += send_bytes;
+ }
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {
@@ -345,9 +389,10 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
&s->stats.outgoing));
}
}
- s->sending_bytes += send_bytes;
now_writing = true;
- if (s->flow_controlled_buffer.length > 0) {
+ if (s->flow_controlled_buffer.length > 0 ||
+ (s->stream_compression_send_enabled &&
+ s->compressed_data_buffer->length > 0)) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork");
grpc_chttp2_list_add_writable_stream(t, s);
}
@@ -361,7 +406,9 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
}
if (s->send_trailing_metadata != NULL &&
s->fetching_send_message == NULL &&
- s->flow_controlled_buffer.length == 0) {
+ s->flow_controlled_buffer.length == 0 &&
+ (!s->stream_compression_send_enabled ||
+ s->compressed_data_buffer->length == 0)) {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) {
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true,
diff --git a/src/core/ext/transport/inproc/inproc_transport.c b/src/core/ext/transport/inproc/inproc_transport.c
index 14498021eb..6f4b429ee2 100644
--- a/src/core/ext/transport/inproc/inproc_transport.c
+++ b/src/core/ext/transport/inproc/inproc_transport.c
@@ -72,6 +72,7 @@ typedef struct sb_list_entry {
typedef struct {
grpc_byte_stream base;
sb_list_entry *le;
+ grpc_error *shutdown_error;
} inproc_slice_byte_stream;
typedef struct {
@@ -201,24 +202,39 @@ static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *bs,
grpc_slice *slice) {
inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
+ if (stream->shutdown_error != GRPC_ERROR_NONE) {
+ return GRPC_ERROR_REF(stream->shutdown_error);
+ }
*slice = grpc_slice_buffer_take_first(&stream->le->sb);
return GRPC_ERROR_NONE;
}
+static void inproc_slice_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *bs,
+ grpc_error *error) {
+ inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
+ GRPC_ERROR_UNREF(stream->shutdown_error);
+ stream->shutdown_error = error;
+}
+
static void inproc_slice_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *bs) {
inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
sb_list_entry_destroy(exec_ctx, stream->le);
+ GRPC_ERROR_UNREF(stream->shutdown_error);
}
+static const grpc_byte_stream_vtable inproc_slice_byte_stream_vtable = {
+ inproc_slice_byte_stream_next, inproc_slice_byte_stream_pull,
+ inproc_slice_byte_stream_shutdown, inproc_slice_byte_stream_destroy};
+
void inproc_slice_byte_stream_init(inproc_slice_byte_stream *s,
sb_list_entry *le) {
s->base.length = (uint32_t)le->sb.length;
s->base.flags = 0;
- s->base.next = inproc_slice_byte_stream_next;
- s->base.pull = inproc_slice_byte_stream_pull;
- s->base.destroy = inproc_slice_byte_stream_destroy;
+ s->base.vtable = &inproc_slice_byte_stream_vtable;
s->le = le;
+ s->shutdown_error = GRPC_ERROR_NONE;
}
static void ref_transport(inproc_transport *t) {
@@ -956,11 +972,18 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GPR_ASSERT(grpc_byte_stream_next(exec_ctx,
op->payload->send_message.send_message,
SIZE_MAX, &unused));
- grpc_byte_stream_pull(exec_ctx, op->payload->send_message.send_message,
- &message_slice);
+ error = grpc_byte_stream_pull(
+ exec_ctx, op->payload->send_message.send_message, &message_slice);
+ if (error != GRPC_ERROR_NONE) {
+ cancel_stream_locked(exec_ctx, s, GRPC_ERROR_REF(error));
+ break;
+ }
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
remaining -= GRPC_SLICE_LENGTH(message_slice);
grpc_slice_buffer_add(dest, message_slice);
} while (remaining != 0);
+ grpc_byte_stream_destroy(exec_ctx,
+ op->payload->send_message.send_message);
}
if (error == GRPC_ERROR_NONE && op->send_trailing_metadata) {
grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md
diff --git a/src/core/lib/iomgr/nameser.h b/src/core/lib/iomgr/nameser.h
new file mode 100644
index 0000000000..daed6de518
--- /dev/null
+++ b/src/core/lib/iomgr/nameser.h
@@ -0,0 +1,104 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_NAMESER_H
+#define GRPC_CORE_LIB_IOMGR_NAMESER_H
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_HAVE_ARPA_NAMESER
+
+#include <arpa/nameser.h>
+
+#else /* GRPC_HAVE_ARPA_NAMESER */
+
+typedef enum __ns_class {
+ ns_c_invalid = 0, /* Cookie. */
+ ns_c_in = 1, /* Internet. */
+ ns_c_2 = 2, /* unallocated/unsupported. */
+ ns_c_chaos = 3, /* MIT Chaos-net. */
+ ns_c_hs = 4, /* MIT Hesiod. */
+ /* Query class values which do not appear in resource records */
+ ns_c_none = 254, /* for prereq. sections in update requests */
+ ns_c_any = 255, /* Wildcard match. */
+ ns_c_max = 65536
+} ns_class;
+
+typedef enum __ns_type {
+ ns_t_invalid = 0, /* Cookie. */
+ ns_t_a = 1, /* Host address. */
+ ns_t_ns = 2, /* Authoritative server. */
+ ns_t_md = 3, /* Mail destination. */
+ ns_t_mf = 4, /* Mail forwarder. */
+ ns_t_cname = 5, /* Canonical name. */
+ ns_t_soa = 6, /* Start of authority zone. */
+ ns_t_mb = 7, /* Mailbox domain name. */
+ ns_t_mg = 8, /* Mail group member. */
+ ns_t_mr = 9, /* Mail rename name. */
+ ns_t_null = 10, /* Null resource record. */
+ ns_t_wks = 11, /* Well known service. */
+ ns_t_ptr = 12, /* Domain name pointer. */
+ ns_t_hinfo = 13, /* Host information. */
+ ns_t_minfo = 14, /* Mailbox information. */
+ ns_t_mx = 15, /* Mail routing information. */
+ ns_t_txt = 16, /* Text strings. */
+ ns_t_rp = 17, /* Responsible person. */
+ ns_t_afsdb = 18, /* AFS cell database. */
+ ns_t_x25 = 19, /* X_25 calling address. */
+ ns_t_isdn = 20, /* ISDN calling address. */
+ ns_t_rt = 21, /* Router. */
+ ns_t_nsap = 22, /* NSAP address. */
+ ns_t_nsap_ptr = 23, /* Reverse NSAP lookup (deprecated). */
+ ns_t_sig = 24, /* Security signature. */
+ ns_t_key = 25, /* Security key. */
+ ns_t_px = 26, /* X.400 mail mapping. */
+ ns_t_gpos = 27, /* Geographical position (withdrawn). */
+ ns_t_aaaa = 28, /* Ip6 Address. */
+ ns_t_loc = 29, /* Location Information. */
+ ns_t_nxt = 30, /* Next domain (security). */
+ ns_t_eid = 31, /* Endpoint identifier. */
+ ns_t_nimloc = 32, /* Nimrod Locator. */
+ ns_t_srv = 33, /* Server Selection. */
+ ns_t_atma = 34, /* ATM Address */
+ ns_t_naptr = 35, /* Naming Authority PoinTeR */
+ ns_t_kx = 36, /* Key Exchange */
+ ns_t_cert = 37, /* Certification record */
+ ns_t_a6 = 38, /* IPv6 address (deprecates AAAA) */
+ ns_t_dname = 39, /* Non-terminal DNAME (for IPv6) */
+ ns_t_sink = 40, /* Kitchen sink (experimentatl) */
+ ns_t_opt = 41, /* EDNS0 option (meta-RR) */
+ ns_t_apl = 42, /* Address prefix list (RFC3123) */
+ ns_t_ds = 43, /* Delegation Signer (RFC4034) */
+ ns_t_sshfp = 44, /* SSH Key Fingerprint (RFC4255) */
+ ns_t_rrsig = 46, /* Resource Record Signature (RFC4034) */
+ ns_t_nsec = 47, /* Next Secure (RFC4034) */
+ ns_t_dnskey = 48, /* DNS Public Key (RFC4034) */
+ ns_t_tkey = 249, /* Transaction key */
+ ns_t_tsig = 250, /* Transaction signature. */
+ ns_t_ixfr = 251, /* Incremental zone transfer. */
+ ns_t_axfr = 252, /* Transfer zone of authority. */
+ ns_t_mailb = 253, /* Transfer mailbox records. */
+ ns_t_maila = 254, /* Transfer mail agent records. */
+ ns_t_any = 255, /* Wildcard match. */
+ ns_t_zxfr = 256, /* BIND-specific, nonstandard. */
+ ns_t_max = 65536
+} ns_type;
+
+#endif /* GRPC_HAVE_ARPA_NAMESER */
+
+#endif /* GRPC_CORE_LIB_IOMGR_NAMESER_H */
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index f5d15b4850..c12058f890 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -24,6 +24,7 @@
#if defined(GRPC_UV)
// Do nothing
#elif defined(GPR_MANYLINUX1)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_IP_PKTINFO 1
@@ -51,6 +52,7 @@
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_LINUX)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_IP_PKTINFO 1
@@ -82,6 +84,7 @@
#define GRPC_POSIX_SOCKETUTILS
#endif
#elif defined(GPR_APPLE)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_SO_NOSIGPIPE 1
#define GRPC_HAVE_UNIX_SOCKET 1
@@ -93,6 +96,7 @@
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_FREEBSD)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_SO_NOSIGPIPE 1
@@ -104,6 +108,7 @@
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_NACL)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
#define GRPC_POSIX_SOCKET 1
#define GRPC_POSIX_SOCKETADDR 1
diff --git a/src/core/lib/support/avl.c b/src/core/lib/support/avl.c
index a6178fdbce..0e28b24c98 100644
--- a/src/core/lib/support/avl.c
+++ b/src/core/lib/support/avl.c
@@ -39,15 +39,16 @@ static gpr_avl_node *ref_node(gpr_avl_node *node) {
return node;
}
-static void unref_node(const gpr_avl_vtable *vtable, gpr_avl_node *node) {
+static void unref_node(const gpr_avl_vtable *vtable, gpr_avl_node *node,
+ void *user_data) {
if (node == NULL) {
return;
}
if (gpr_unref(&node->refs)) {
- vtable->destroy_key(node->key);
- vtable->destroy_value(node->value);
- unref_node(vtable, node->left);
- unref_node(vtable, node->right);
+ vtable->destroy_key(node->key, user_data);
+ vtable->destroy_value(node->value, user_data);
+ unref_node(vtable, node->left, user_data);
+ unref_node(vtable, node->right, user_data);
gpr_free(node);
}
}
@@ -87,30 +88,30 @@ gpr_avl_node *new_node(void *key, void *value, gpr_avl_node *left,
}
static gpr_avl_node *get(const gpr_avl_vtable *vtable, gpr_avl_node *node,
- void *key) {
+ void *key, void *user_data) {
long cmp;
if (node == NULL) {
return NULL;
}
- cmp = vtable->compare_keys(node->key, key);
+ cmp = vtable->compare_keys(node->key, key, user_data);
if (cmp == 0) {
return node;
} else if (cmp > 0) {
- return get(vtable, node->left, key);
+ return get(vtable, node->left, key, user_data);
} else {
- return get(vtable, node->right, key);
+ return get(vtable, node->right, key, user_data);
}
}
-void *gpr_avl_get(gpr_avl avl, void *key) {
- gpr_avl_node *node = get(avl.vtable, avl.root, key);
+void *gpr_avl_get(gpr_avl avl, void *key, void *user_data) {
+ gpr_avl_node *node = get(avl.vtable, avl.root, key, user_data);
return node ? node->value : NULL;
}
-int gpr_avl_maybe_get(gpr_avl avl, void *key, void **value) {
- gpr_avl_node *node = get(avl.vtable, avl.root, key);
+int gpr_avl_maybe_get(gpr_avl avl, void *key, void **value, void *user_data) {
+ gpr_avl_node *node = get(avl.vtable, avl.root, key, user_data);
if (node != NULL) {
*value = node->value;
return 1;
@@ -120,70 +121,75 @@ int gpr_avl_maybe_get(gpr_avl avl, void *key, void **value) {
static gpr_avl_node *rotate_left(const gpr_avl_vtable *vtable, void *key,
void *value, gpr_avl_node *left,
- gpr_avl_node *right) {
- gpr_avl_node *n =
- new_node(vtable->copy_key(right->key), vtable->copy_value(right->value),
- new_node(key, value, left, ref_node(right->left)),
- ref_node(right->right));
- unref_node(vtable, right);
+ gpr_avl_node *right, void *user_data) {
+ gpr_avl_node *n = new_node(vtable->copy_key(right->key, user_data),
+ vtable->copy_value(right->value, user_data),
+ new_node(key, value, left, ref_node(right->left)),
+ ref_node(right->right));
+ unref_node(vtable, right, user_data);
return n;
}
static gpr_avl_node *rotate_right(const gpr_avl_vtable *vtable, void *key,
void *value, gpr_avl_node *left,
- gpr_avl_node *right) {
- gpr_avl_node *n = new_node(
- vtable->copy_key(left->key), vtable->copy_value(left->value),
- ref_node(left->left), new_node(key, value, ref_node(left->right), right));
- unref_node(vtable, left);
+ gpr_avl_node *right, void *user_data) {
+ gpr_avl_node *n =
+ new_node(vtable->copy_key(left->key, user_data),
+ vtable->copy_value(left->value, user_data), ref_node(left->left),
+ new_node(key, value, ref_node(left->right), right));
+ unref_node(vtable, left, user_data);
return n;
}
static gpr_avl_node *rotate_left_right(const gpr_avl_vtable *vtable, void *key,
void *value, gpr_avl_node *left,
- gpr_avl_node *right) {
+ gpr_avl_node *right, void *user_data) {
/* rotate_right(..., rotate_left(left), right) */
- gpr_avl_node *n = new_node(
- vtable->copy_key(left->right->key),
- vtable->copy_value(left->right->value),
- new_node(vtable->copy_key(left->key), vtable->copy_value(left->value),
- ref_node(left->left), ref_node(left->right->left)),
- new_node(key, value, ref_node(left->right->right), right));
- unref_node(vtable, left);
+ gpr_avl_node *n =
+ new_node(vtable->copy_key(left->right->key, user_data),
+ vtable->copy_value(left->right->value, user_data),
+ new_node(vtable->copy_key(left->key, user_data),
+ vtable->copy_value(left->value, user_data),
+ ref_node(left->left), ref_node(left->right->left)),
+ new_node(key, value, ref_node(left->right->right), right));
+ unref_node(vtable, left, user_data);
return n;
}
static gpr_avl_node *rotate_right_left(const gpr_avl_vtable *vtable, void *key,
void *value, gpr_avl_node *left,
- gpr_avl_node *right) {
+ gpr_avl_node *right, void *user_data) {
/* rotate_left(..., left, rotate_right(right)) */
- gpr_avl_node *n = new_node(
- vtable->copy_key(right->left->key),
- vtable->copy_value(right->left->value),
- new_node(key, value, left, ref_node(right->left->left)),
- new_node(vtable->copy_key(right->key), vtable->copy_value(right->value),
- ref_node(right->left->right), ref_node(right->right)));
- unref_node(vtable, right);
+ gpr_avl_node *n =
+ new_node(vtable->copy_key(right->left->key, user_data),
+ vtable->copy_value(right->left->value, user_data),
+ new_node(key, value, left, ref_node(right->left->left)),
+ new_node(vtable->copy_key(right->key, user_data),
+ vtable->copy_value(right->value, user_data),
+ ref_node(right->left->right), ref_node(right->right)));
+ unref_node(vtable, right, user_data);
return n;
}
static gpr_avl_node *rebalance(const gpr_avl_vtable *vtable, void *key,
void *value, gpr_avl_node *left,
- gpr_avl_node *right) {
+ gpr_avl_node *right, void *user_data) {
switch (node_height(left) - node_height(right)) {
case 2:
if (node_height(left->left) - node_height(left->right) == -1) {
return assert_invariants(
- rotate_left_right(vtable, key, value, left, right));
+ rotate_left_right(vtable, key, value, left, right, user_data));
} else {
- return assert_invariants(rotate_right(vtable, key, value, left, right));
+ return assert_invariants(
+ rotate_right(vtable, key, value, left, right, user_data));
}
case -2:
if (node_height(right->left) - node_height(right->right) == 1) {
return assert_invariants(
- rotate_right_left(vtable, key, value, left, right));
+ rotate_right_left(vtable, key, value, left, right, user_data));
} else {
- return assert_invariants(rotate_left(vtable, key, value, left, right));
+ return assert_invariants(
+ rotate_left(vtable, key, value, left, right, user_data));
}
default:
return assert_invariants(new_node(key, value, left, right));
@@ -191,30 +197,32 @@ static gpr_avl_node *rebalance(const gpr_avl_vtable *vtable, void *key,
}
static gpr_avl_node *add_key(const gpr_avl_vtable *vtable, gpr_avl_node *node,
- void *key, void *value) {
+ void *key, void *value, void *user_data) {
long cmp;
if (node == NULL) {
return new_node(key, value, NULL, NULL);
}
- cmp = vtable->compare_keys(node->key, key);
+ cmp = vtable->compare_keys(node->key, key, user_data);
if (cmp == 0) {
return new_node(key, value, ref_node(node->left), ref_node(node->right));
} else if (cmp > 0) {
- return rebalance(
- vtable, vtable->copy_key(node->key), vtable->copy_value(node->value),
- add_key(vtable, node->left, key, value), ref_node(node->right));
+ return rebalance(vtable, vtable->copy_key(node->key, user_data),
+ vtable->copy_value(node->value, user_data),
+ add_key(vtable, node->left, key, value, user_data),
+ ref_node(node->right), user_data);
} else {
- return rebalance(vtable, vtable->copy_key(node->key),
- vtable->copy_value(node->value), ref_node(node->left),
- add_key(vtable, node->right, key, value));
+ return rebalance(
+ vtable, vtable->copy_key(node->key, user_data),
+ vtable->copy_value(node->value, user_data), ref_node(node->left),
+ add_key(vtable, node->right, key, value, user_data), user_data);
}
}
-gpr_avl gpr_avl_add(gpr_avl avl, void *key, void *value) {
+gpr_avl gpr_avl_add(gpr_avl avl, void *key, void *value, void *user_data) {
gpr_avl_node *old_root = avl.root;
- avl.root = add_key(avl.vtable, avl.root, key, value);
+ avl.root = add_key(avl.vtable, avl.root, key, value, user_data);
assert_invariants(avl.root);
- unref_node(avl.vtable, old_root);
+ unref_node(avl.vtable, old_root, user_data);
return avl;
}
@@ -233,12 +241,13 @@ static gpr_avl_node *in_order_tail(gpr_avl_node *node) {
}
static gpr_avl_node *remove_key(const gpr_avl_vtable *vtable,
- gpr_avl_node *node, void *key) {
+ gpr_avl_node *node, void *key,
+ void *user_data) {
long cmp;
if (node == NULL) {
return NULL;
}
- cmp = vtable->compare_keys(node->key, key);
+ cmp = vtable->compare_keys(node->key, key, user_data);
if (cmp == 0) {
if (node->left == NULL) {
return ref_node(node->right);
@@ -246,39 +255,45 @@ static gpr_avl_node *remove_key(const gpr_avl_vtable *vtable,
return ref_node(node->left);
} else if (node->left->height < node->right->height) {
gpr_avl_node *h = in_order_head(node->right);
- return rebalance(vtable, vtable->copy_key(h->key),
- vtable->copy_value(h->value), ref_node(node->left),
- remove_key(vtable, node->right, h->key));
+ return rebalance(
+ vtable, vtable->copy_key(h->key, user_data),
+ vtable->copy_value(h->value, user_data), ref_node(node->left),
+ remove_key(vtable, node->right, h->key, user_data), user_data);
} else {
gpr_avl_node *h = in_order_tail(node->left);
- return rebalance(
- vtable, vtable->copy_key(h->key), vtable->copy_value(h->value),
- remove_key(vtable, node->left, h->key), ref_node(node->right));
+ return rebalance(vtable, vtable->copy_key(h->key, user_data),
+ vtable->copy_value(h->value, user_data),
+ remove_key(vtable, node->left, h->key, user_data),
+ ref_node(node->right), user_data);
}
} else if (cmp > 0) {
- return rebalance(
- vtable, vtable->copy_key(node->key), vtable->copy_value(node->value),
- remove_key(vtable, node->left, key), ref_node(node->right));
+ return rebalance(vtable, vtable->copy_key(node->key, user_data),
+ vtable->copy_value(node->value, user_data),
+ remove_key(vtable, node->left, key, user_data),
+ ref_node(node->right), user_data);
} else {
- return rebalance(vtable, vtable->copy_key(node->key),
- vtable->copy_value(node->value), ref_node(node->left),
- remove_key(vtable, node->right, key));
+ return rebalance(
+ vtable, vtable->copy_key(node->key, user_data),
+ vtable->copy_value(node->value, user_data), ref_node(node->left),
+ remove_key(vtable, node->right, key, user_data), user_data);
}
}
-gpr_avl gpr_avl_remove(gpr_avl avl, void *key) {
+gpr_avl gpr_avl_remove(gpr_avl avl, void *key, void *user_data) {
gpr_avl_node *old_root = avl.root;
- avl.root = remove_key(avl.vtable, avl.root, key);
+ avl.root = remove_key(avl.vtable, avl.root, key, user_data);
assert_invariants(avl.root);
- unref_node(avl.vtable, old_root);
+ unref_node(avl.vtable, old_root, user_data);
return avl;
}
-gpr_avl gpr_avl_ref(gpr_avl avl) {
+gpr_avl gpr_avl_ref(gpr_avl avl, void *user_data) {
ref_node(avl.root);
return avl;
}
-void gpr_avl_unref(gpr_avl avl) { unref_node(avl.vtable, avl.root); }
+void gpr_avl_unref(gpr_avl avl, void *user_data) {
+ unref_node(avl.vtable, avl.root, user_data);
+}
int gpr_avl_is_empty(gpr_avl avl) { return avl.root == NULL; }
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c
index ef8405cca8..55934964f3 100644
--- a/src/core/lib/surface/alarm.c
+++ b/src/core/lib/surface/alarm.c
@@ -18,6 +18,7 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/completion_queue.h"
@@ -49,7 +50,7 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
alarm->cq = cq;
alarm->tag = tag;
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&exec_ctx, &alarm->alarm,
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 2365d27307..04613f17e3 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -1422,7 +1422,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (nops == 0) {
if (!is_notify_tag_closure) {
- grpc_cq_begin_op(call->cq, notify_tag);
+ GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
free_no_op_completion, NULL,
gpr_malloc(sizeof(grpc_cq_completion)));
@@ -1723,7 +1723,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
GRPC_CALL_INTERNAL_REF(call, "completion");
if (!is_notify_tag_closure) {
- grpc_cq_begin_op(call->cq, notify_tag);
+ GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
}
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
@@ -1844,6 +1844,8 @@ const char *grpc_call_error_to_string(grpc_call_error error) {
return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
+ case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
+ return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
case GRPC_CALL_OK:
return "GRPC_CALL_OK";
}
diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c
index 80eb80af78..e85b308850 100644
--- a/src/core/lib/surface/channel_ping.c
+++ b/src/core/lib/surface/channel_ping.c
@@ -59,7 +59,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx);
op->send_ping = &pr->closure;
op->bind_pollset = grpc_cq_pollset(cq);
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
top_elem->filter->start_transport_op(&exec_ctx, top_elem, op);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 978d7b4171..3d82a32e82 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -196,7 +196,7 @@ typedef struct cq_vtable {
void (*init)(void *data);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq);
void (*destroy)(void *data);
- void (*begin_op)(grpc_completion_queue *cq, void *tag);
+ bool (*begin_op)(grpc_completion_queue *cq, void *tag);
void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
@@ -288,8 +288,8 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq);
-static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
-static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
+static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
+static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq, void *tag,
@@ -522,33 +522,6 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
}
}
-static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
- cq_next_data *cqd = DATA_FROM_CQ(cq);
- GPR_ASSERT(!cqd->shutdown_called);
- gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1);
-}
-
-static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
- GPR_ASSERT(!cqd->shutdown_called);
- gpr_ref(&cqd->pending_events);
-}
-
-void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
-#ifndef NDEBUG
- gpr_mu_lock(cq->mu);
- if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
- cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
- cq->outstanding_tags =
- gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) *
- cq->outstanding_tag_capacity);
- }
- cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
- gpr_mu_unlock(cq->mu);
-#endif
- cq->vtable->begin_op(cq, tag);
-}
-
#ifndef NDEBUG
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
int found = 0;
@@ -576,6 +549,41 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
#endif
+static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
+ while (true) {
+ gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events);
+ if (count == 0) {
+ return false;
+ } else if (gpr_atm_no_barrier_cas(&cqd->pending_events, count, count + 1)) {
+ break;
+ }
+ }
+ return true;
+}
+
+static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ GPR_ASSERT(!cqd->shutdown_called);
+ gpr_ref(&cqd->pending_events);
+ return true;
+}
+
+bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
+#ifndef NDEBUG
+ gpr_mu_lock(cq->mu);
+ if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
+ cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
+ cq->outstanding_tags =
+ gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) *
+ cq->outstanding_tag_capacity);
+ }
+ cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
+ gpr_mu_unlock(cq->mu);
+#endif
+ return cq->vtable->begin_op(cq, tag);
+}
+
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_NEXT) */
@@ -855,8 +863,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
inconsistent state. If it is the latter, we shold do a 0-timeout poll
so that the thread comes back quickly from poll to make a second
attempt at popping. Not doing this can potentially deadlock this
- thread
- forever (if the deadline is infinity) */
+ thread forever (if the deadline is infinity) */
if (cq_event_queue_num_items(&cqd->queue) > 0) {
iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
}
@@ -869,10 +876,8 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
if (cq_event_queue_num_items(&cqd->queue) > 0) {
/* Go to the beginning of the loop. No point doing a poll because
(cq->shutdown == true) is only possible when there is no pending
- work
- (i.e cq->pending_events == 0) and any outstanding
- grpc_cq_completion
- events are already queued on this cq */
+ work (i.e cq->pending_events == 0) and any outstanding completion
+ events should have already been queued on this cq */
continue;
}
@@ -909,11 +914,6 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
is_finished_arg.first_loop = false;
}
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
- grpc_exec_ctx_finish(&exec_ctx);
- GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
-
if (cq_event_queue_num_items(&cqd->queue) > 0 &&
gpr_atm_no_barrier_load(&cqd->pending_events) > 0) {
gpr_mu_lock(cq->mu);
@@ -921,6 +921,11 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
gpr_mu_unlock(cq->mu);
}
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
+ grpc_exec_ctx_finish(&exec_ctx);
+ GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
+
GPR_TIMER_END("grpc_completion_queue_next", 0);
return ret;
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index af44482513..69d144bd95 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -72,8 +72,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc);
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made.
- \a tag is currently used only in debug builds. */
-void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
+ \a tag is currently used only in debug builds. Return true on success, and
+ false if completion_queue has been shutdown. */
+bool grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
/* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to
grpc_cq_begin_op */
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index fce7f8dca1..66dcc299aa 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1259,7 +1259,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
}
/* stay locked, and gather up some stuff to do */
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
if (server->shutdown_published) {
grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown,
NULL, gpr_malloc(sizeof(grpc_cq_completion)));
@@ -1446,7 +1446,11 @@ grpc_call_error grpc_server_request_call(
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
}
- grpc_cq_begin_op(cq_for_notification, tag);
+ if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
+ goto done;
+ }
details->reserved = NULL;
rc->cq_idx = cq_idx;
rc->type = BATCH_CALL;
@@ -1496,7 +1500,11 @@ grpc_call_error grpc_server_request_registered_call(
error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
goto done;
}
- grpc_cq_begin_op(cq_for_notification, tag);
+ if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
+ goto done;
+ }
rc->cq_idx = cq_idx;
rc->type = REGISTERED_CALL;
rc->server = server;
diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c
index 3355814017..fb03a10315 100644
--- a/src/core/lib/transport/byte_stream.c
+++ b/src/core/lib/transport/byte_stream.c
@@ -19,29 +19,37 @@
#include "src/core/lib/transport/byte_stream.h"
#include <stdlib.h>
+#include <string.h>
#include <grpc/support/log.h>
#include "src/core/lib/slice/slice_internal.h"
-int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream, size_t max_size_hint,
- grpc_closure *on_complete) {
- return byte_stream->next(exec_ctx, byte_stream, max_size_hint, on_complete);
+bool grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream, size_t max_size_hint,
+ grpc_closure *on_complete) {
+ return byte_stream->vtable->next(exec_ctx, byte_stream, max_size_hint,
+ on_complete);
}
grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_slice *slice) {
- return byte_stream->pull(exec_ctx, byte_stream, slice);
+ return byte_stream->vtable->pull(exec_ctx, byte_stream, slice);
+}
+
+void grpc_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_error *error) {
+ byte_stream->vtable->shutdown(exec_ctx, byte_stream, error);
}
void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream) {
- byte_stream->destroy(exec_ctx, byte_stream);
+ byte_stream->vtable->destroy(exec_ctx, byte_stream);
}
-/* slice_buffer_stream */
+// grpc_slice_buffer_stream
static bool slice_buffer_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
@@ -56,6 +64,9 @@ static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_slice *slice) {
grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
+ if (stream->shutdown_error != GRPC_ERROR_NONE) {
+ return GRPC_ERROR_REF(stream->shutdown_error);
+ }
GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
*slice =
grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]);
@@ -63,8 +74,23 @@ static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE;
}
+static void slice_buffer_stream_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_error *error) {
+ grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
+ GRPC_ERROR_UNREF(stream->shutdown_error);
+ stream->shutdown_error = error;
+}
+
static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream) {}
+ grpc_byte_stream *byte_stream) {
+ grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
+ GRPC_ERROR_UNREF(stream->shutdown_error);
+}
+
+static const grpc_byte_stream_vtable slice_buffer_stream_vtable = {
+ slice_buffer_stream_next, slice_buffer_stream_pull,
+ slice_buffer_stream_shutdown, slice_buffer_stream_destroy};
void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
grpc_slice_buffer *slice_buffer,
@@ -72,9 +98,89 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
GPR_ASSERT(slice_buffer->length <= UINT32_MAX);
stream->base.length = (uint32_t)slice_buffer->length;
stream->base.flags = flags;
- stream->base.next = slice_buffer_stream_next;
- stream->base.pull = slice_buffer_stream_pull;
- stream->base.destroy = slice_buffer_stream_destroy;
+ stream->base.vtable = &slice_buffer_stream_vtable;
stream->backing_buffer = slice_buffer;
stream->cursor = 0;
+ stream->shutdown_error = GRPC_ERROR_NONE;
+}
+
+// grpc_caching_byte_stream
+
+void grpc_byte_stream_cache_init(grpc_byte_stream_cache *cache,
+ grpc_byte_stream *underlying_stream) {
+ cache->underlying_stream = underlying_stream;
+ grpc_slice_buffer_init(&cache->cache_buffer);
+}
+
+void grpc_byte_stream_cache_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream_cache *cache) {
+ grpc_byte_stream_destroy(exec_ctx, cache->underlying_stream);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &cache->cache_buffer);
+}
+
+static bool caching_byte_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ size_t max_size_hint,
+ grpc_closure *on_complete) {
+ grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream;
+ if (stream->shutdown_error != GRPC_ERROR_NONE) return true;
+ if (stream->cursor < stream->cache->cache_buffer.count) return true;
+ return grpc_byte_stream_next(exec_ctx, stream->cache->underlying_stream,
+ max_size_hint, on_complete);
+}
+
+static grpc_error *caching_byte_stream_pull(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_slice *slice) {
+ grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream;
+ if (stream->shutdown_error != GRPC_ERROR_NONE) {
+ return GRPC_ERROR_REF(stream->shutdown_error);
+ }
+ if (stream->cursor < stream->cache->cache_buffer.count) {
+ *slice = grpc_slice_ref_internal(
+ stream->cache->cache_buffer.slices[stream->cursor]);
+ ++stream->cursor;
+ return GRPC_ERROR_NONE;
+ }
+ grpc_error *error =
+ grpc_byte_stream_pull(exec_ctx, stream->cache->underlying_stream, slice);
+ if (error == GRPC_ERROR_NONE) {
+ ++stream->cursor;
+ grpc_slice_buffer_add(&stream->cache->cache_buffer,
+ grpc_slice_ref_internal(*slice));
+ }
+ return error;
+}
+
+static void caching_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_error *error) {
+ grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream;
+ GRPC_ERROR_UNREF(stream->shutdown_error);
+ stream->shutdown_error = GRPC_ERROR_REF(error);
+ grpc_byte_stream_shutdown(exec_ctx, stream->cache->underlying_stream, error);
+}
+
+static void caching_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream) {
+ grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream;
+ GRPC_ERROR_UNREF(stream->shutdown_error);
+}
+
+static const grpc_byte_stream_vtable caching_byte_stream_vtable = {
+ caching_byte_stream_next, caching_byte_stream_pull,
+ caching_byte_stream_shutdown, caching_byte_stream_destroy};
+
+void grpc_caching_byte_stream_init(grpc_caching_byte_stream *stream,
+ grpc_byte_stream_cache *cache) {
+ memset(stream, 0, sizeof(*stream));
+ stream->base.length = cache->underlying_stream->length;
+ stream->base.flags = cache->underlying_stream->flags;
+ stream->base.vtable = &caching_byte_stream_vtable;
+ stream->cache = cache;
+ stream->shutdown_error = GRPC_ERROR_NONE;
+}
+
+void grpc_caching_byte_stream_reset(grpc_caching_byte_stream *stream) {
+ stream->cursor = 0;
}
diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h
index f172296e4b..1e1e8310b8 100644
--- a/src/core/lib/transport/byte_stream.h
+++ b/src/core/lib/transport/byte_stream.h
@@ -28,52 +28,109 @@
/** Mask of all valid internal flags. */
#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)
-struct grpc_byte_stream;
typedef struct grpc_byte_stream grpc_byte_stream;
-struct grpc_byte_stream {
- uint32_t length;
- uint32_t flags;
+typedef struct {
bool (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
size_t max_size_hint, grpc_closure *on_complete);
grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
grpc_slice *slice);
+ void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
+ grpc_error *error);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream);
+} grpc_byte_stream_vtable;
+
+struct grpc_byte_stream {
+ uint32_t length;
+ uint32_t flags;
+ const grpc_byte_stream_vtable *vtable;
};
-/* returns 1 if the bytes are available immediately (in which case
- * on_complete will not be called), 0 if the bytes will be available
- * asynchronously.
- *
- * max_size_hint can be set as a hint as to the maximum number
- * of bytes that would be acceptable to read.
- */
-int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream, size_t max_size_hint,
- grpc_closure *on_complete);
+// Returns true if the bytes are available immediately (in which case
+// on_complete will not be called), false if the bytes will be available
+// asynchronously.
+//
+// max_size_hint can be set as a hint as to the maximum number
+// of bytes that would be acceptable to read.
+bool grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream, size_t max_size_hint,
+ grpc_closure *on_complete);
-/* returns the next slice in the byte stream when it is ready (indicated by
- * either grpc_byte_stream_next returning 1 or on_complete passed to
- * grpc_byte_stream_next is called).
- *
- * once a slice is returned into *slice, it is owned by the caller.
- */
+// Returns the next slice in the byte stream when it is ready (indicated by
+// either grpc_byte_stream_next returning true or on_complete passed to
+// grpc_byte_stream_next is called).
+//
+// Once a slice is returned into *slice, it is owned by the caller.
grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_slice *slice);
+// Shuts down the byte stream.
+//
+// If there is a pending call to on_complete from grpc_byte_stream_next(),
+// it will be invoked with the error passed to grpc_byte_stream_shutdown().
+//
+// The next call to grpc_byte_stream_pull() (if any) will return the error
+// passed to grpc_byte_stream_shutdown().
+void grpc_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_error *error);
+
void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream);
-/* grpc_byte_stream that wraps a slice buffer */
+// grpc_slice_buffer_stream
+//
+// A grpc_byte_stream that wraps a slice buffer.
+
typedef struct grpc_slice_buffer_stream {
grpc_byte_stream base;
grpc_slice_buffer *backing_buffer;
size_t cursor;
+ grpc_error *shutdown_error;
} grpc_slice_buffer_stream;
void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
grpc_slice_buffer *slice_buffer,
uint32_t flags);
+// grpc_caching_byte_stream
+//
+// A grpc_byte_stream that that wraps an underlying byte stream but caches
+// the resulting slices in a slice buffer. If an initial attempt fails
+// without fully draining the underlying stream, a new caching stream
+// can be created from the same underlying cache, in which case it will
+// return whatever is in the backing buffer before continuing to read the
+// underlying stream.
+//
+// NOTE: No synchronization is done, so it is not safe to have multiple
+// grpc_caching_byte_streams simultaneously drawing from the same underlying
+// grpc_byte_stream_cache at the same time.
+
+typedef struct {
+ grpc_byte_stream *underlying_stream;
+ grpc_slice_buffer cache_buffer;
+} grpc_byte_stream_cache;
+
+// Takes ownership of underlying_stream.
+void grpc_byte_stream_cache_init(grpc_byte_stream_cache *cache,
+ grpc_byte_stream *underlying_stream);
+
+// Must not be called while still in use by a grpc_caching_byte_stream.
+void grpc_byte_stream_cache_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream_cache *cache);
+
+typedef struct {
+ grpc_byte_stream base;
+ grpc_byte_stream_cache *cache;
+ size_t cursor;
+ grpc_error *shutdown_error;
+} grpc_caching_byte_stream;
+
+void grpc_caching_byte_stream_init(grpc_caching_byte_stream *stream,
+ grpc_byte_stream_cache *cache);
+
+// Resets the byte stream to the start of the underlying stream.
+void grpc_caching_byte_stream_reset(grpc_caching_byte_stream *stream);
+
#endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 7281602d66..6c61f4b8d9 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -207,27 +207,35 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
return transport->vtable->get_endpoint(exec_ctx, transport);
}
+// This comment should be sung to the tune of
+// "Supercalifragilisticexpialidocious":
+//
// grpc_transport_stream_op_batch_finish_with_failure
// is a function that must always unref cancel_error
// though it lives in lib, it handles transport stream ops sure
// it's grpc_transport_stream_op_batch_finish_with_failure
void grpc_transport_stream_op_batch_finish_with_failure(
- grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op,
+ grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *batch,
grpc_error *error) {
- if (op->recv_message) {
- GRPC_CLOSURE_SCHED(exec_ctx, op->payload->recv_message.recv_message_ready,
+ if (batch->send_message) {
+ grpc_byte_stream_destroy(exec_ctx,
+ batch->payload->send_message.send_message);
+ }
+ if (batch->recv_message) {
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ batch->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
}
- if (op->recv_initial_metadata) {
+ if (batch->recv_initial_metadata) {
GRPC_CLOSURE_SCHED(
exec_ctx,
- op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
}
- GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, error);
- if (op->cancel_stream) {
- GRPC_ERROR_UNREF(op->payload->cancel_stream.cancel_error);
+ GRPC_CLOSURE_SCHED(exec_ctx, batch->on_complete, error);
+ if (batch->cancel_stream) {
+ GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
}
}
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 84e53e683a..099138ea14 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -159,6 +159,11 @@ struct grpc_transport_stream_op_batch_payload {
} send_trailing_metadata;
struct {
+ // The transport (or a filter that decides to return a failure before
+ // the op gets down to the transport) is responsible for calling
+ // grpc_byte_stream_destroy() on this.
+ // The batch's on_complete will not be called until after the byte
+ // stream is destroyed.
grpc_byte_stream *send_message;
} send_message;
@@ -174,6 +179,10 @@ struct grpc_transport_stream_op_batch_payload {
} recv_initial_metadata;
struct {
+ // Will be set by the transport to point to the byte stream
+ // containing a received message.
+ // The caller is responsible for calling grpc_byte_stream_destroy()
+ // on this byte stream.
grpc_byte_stream **recv_message;
/** Should be enqueued when one message is ready to be processed. */
grpc_closure *recv_message_ready;