diff options
Diffstat (limited to 'src/core')
33 files changed, 1365 insertions, 761 deletions
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c index c3dca14305..b83c95275f 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.c +++ b/src/core/ext/filters/client_channel/channel_connectivity.c @@ -208,7 +208,7 @@ void grpc_channel_watch_connectivity_state( 7, (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, cq, tag)); - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); gpr_mu_init(&w->mu); GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w, diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c index 52c6e38c87..568bb2ba8d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c @@ -88,7 +88,6 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, // Record call finished, optionally setting client_failed_to_send and // received. grpc_grpclb_client_stats_add_call_finished( - false /* drop_for_rate_limiting */, false /* drop_for_load_balancing */, !calld->send_initial_metadata_succeeded /* client_failed_to_send */, calld->recv_initial_metadata_succeeded /* known_received */, calld->client_stats); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index ebce801b37..bb9217d843 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -416,9 +416,7 @@ struct rr_connectivity_data { static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, bool log) { - if (server->drop_for_rate_limiting || server->drop_for_load_balancing) { - return false; - } + if (server->drop) return false; const grpc_grpclb_ip_address *ip = &server->ip_address; if (server->port >> 16 != 0) { if (log) { @@ -462,7 +460,7 @@ static const grpc_lb_user_data_vtable lb_token_vtable = { static void parse_server(const grpc_grpclb_server *server, grpc_resolved_address *addr) { memset(addr, 0, sizeof(*addr)); - if (server->drop_for_rate_limiting || server->drop_for_load_balancing) return; + if (server->drop) return; const uint16_t netorder_port = htons((uint16_t)server->port); /* the addresses are given in binary format (a in(6)_addr struct) in * server->ip_address.bytes. */ @@ -610,7 +608,7 @@ static bool pick_from_internal_rr_locked( if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { glb_policy->serverlist_index = 0; // Wrap-around. } - if (server->drop_for_rate_limiting || server->drop_for_load_balancing) { + if (server->drop) { // Not using the RR policy, so unref it. if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", @@ -622,11 +620,8 @@ static bool pick_from_internal_rr_locked( // the client_load_reporting filter, because we do not create a // subchannel call (and therefore no client_load_reporting filter) // for dropped calls. - grpc_grpclb_client_stats_add_call_started(wc_arg->client_stats); - grpc_grpclb_client_stats_add_call_finished( - server->drop_for_rate_limiting, server->drop_for_load_balancing, - false /* failed_to_send */, false /* known_received */, - wc_arg->client_stats); + grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token, + wc_arg->client_stats); grpc_grpclb_client_stats_unref(wc_arg->client_stats); if (force_async) { GPR_ASSERT(wc_arg->wrapped_closure != NULL); @@ -715,7 +710,6 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, return; } glb_policy->rr_policy = new_rr_policy; - grpc_error *rr_state_error = NULL; const grpc_connectivity_state rr_state = grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy, @@ -741,7 +735,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, rr_connectivity->state = rr_state; /* Subscribe to changes to the connectivity of the new RR */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_sched"); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb"); grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, &rr_connectivity->on_change); @@ -806,32 +800,31 @@ static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { rr_connectivity_data *rr_connectivity = arg; glb_lb_policy *glb_policy = rr_connectivity->glb_policy; - - const bool shutting_down = glb_policy->shutting_down; - bool unref_needed = false; - GRPC_ERROR_REF(error); - - if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN || shutting_down) { - /* RR policy shutting down. Don't renew subscription and free the arg of - * this callback. In addition we need to stash away the current policy to - * be UNREF'd after releasing the lock. Otherwise, if the UNREF is the last - * one, the policy would be destroyed, alongside the lock, which would - * result in a use-after-free */ - unref_needed = true; + if (glb_policy->shutting_down) { + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "glb_rr_connectivity_cb"); gpr_free(rr_connectivity); - } else { /* rr state != SHUTDOWN && !shutting down: biz as usual */ - update_lb_connectivity_status_locked( - exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error)); - /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ - grpc_lb_policy_notify_on_state_change_locked( - exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, - &rr_connectivity->on_change); + return; } - if (unref_needed) { + if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN) { + /* An RR policy that has transitioned into the SHUTDOWN connectivity state + * should not be considered for picks or updates: the SHUTDOWN state is a + * sink, policies can't transition back from it. .*/ + GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, + "rr_connectivity_shutdown"); + glb_policy->rr_policy = NULL; GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "rr_connectivity_cb"); + "glb_rr_connectivity_cb"); + gpr_free(rr_connectivity); + return; } - GRPC_ERROR_UNREF(error); + /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */ + update_lb_connectivity_status_locked( + exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error)); + /* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */ + grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy, + &rr_connectivity->state, + &rr_connectivity->on_change); } static void destroy_balancer_name(grpc_exec_ctx *exec_ctx, @@ -995,7 +988,6 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, gpr_free(glb_policy); return NULL; } - GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed, glb_lb_channel_on_connectivity_changed_cb, glb_policy, grpc_combiner_scheduler(args->combiner)); @@ -1052,7 +1044,7 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_policy->pending_picks = NULL; pending_ping *pping = glb_policy->pending_pings; glb_policy->pending_pings = NULL; - if (glb_policy->rr_policy) { + if (glb_policy->rr_policy != NULL) { GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); } // We destroy the LB channel here because @@ -1309,15 +1301,14 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx, } static bool load_report_counters_are_zero(grpc_grpclb_request *request) { + grpc_grpclb_dropped_call_counts *drop_entries = + request->client_stats.calls_finished_with_drop.arg; return request->client_stats.num_calls_started == 0 && request->client_stats.num_calls_finished == 0 && - request->client_stats.num_calls_finished_with_drop_for_rate_limiting == - 0 && - request->client_stats - .num_calls_finished_with_drop_for_load_balancing == 0 && request->client_stats.num_calls_finished_with_client_failed_to_send == 0 && - request->client_stats.num_calls_finished_known_received == 0; + request->client_stats.num_calls_finished_known_received == 0 && + (drop_entries == NULL || drop_entries->num_entries == 0); } static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, @@ -1332,7 +1323,7 @@ static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, // Construct message payload. GPR_ASSERT(glb_policy->client_load_report_payload == NULL); grpc_grpclb_request *request = - grpc_grpclb_load_report_request_create(glb_policy->client_stats); + grpc_grpclb_load_report_request_create_locked(glb_policy->client_stats); // Skip client load report if the counters were all zero in the last // report and they are still zero in this one. if (load_report_counters_are_zero(request)) { @@ -1778,7 +1769,8 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, if (!glb_policy->watching_lb_channel) { // Watch the LB channel connectivity for connection. - glb_policy->lb_channel_connectivity = GRPC_CHANNEL_INIT; + glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state( + glb_policy->lb_channel, true /* try to connect */); grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( grpc_channel_get_channel_stack(glb_policy->lb_channel)); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c index c762443b7c..5b62623145 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c @@ -18,8 +18,11 @@ #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" +#include <string.h> + #include <grpc/support/alloc.h> #include <grpc/support/atm.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/useful.h> @@ -29,10 +32,11 @@ struct grpc_grpclb_client_stats { gpr_refcount refs; + // This field must only be accessed via *_locked() methods. + grpc_grpclb_dropped_call_counts* drop_token_counts; + // These fields may be accessed from multiple threads at a time. gpr_atm num_calls_started; gpr_atm num_calls_finished; - gpr_atm num_calls_finished_with_drop_for_rate_limiting; - gpr_atm num_calls_finished_with_drop_for_load_balancing; gpr_atm num_calls_finished_with_client_failed_to_send; gpr_atm num_calls_finished_known_received; }; @@ -51,6 +55,7 @@ grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref( void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats) { if (gpr_unref(&client_stats->refs)) { + grpc_grpclb_dropped_call_counts_destroy(client_stats->drop_token_counts); gpr_free(client_stats); } } @@ -61,21 +66,9 @@ void grpc_grpclb_client_stats_add_call_started( } void grpc_grpclb_client_stats_add_call_finished( - bool finished_with_drop_for_rate_limiting, - bool finished_with_drop_for_load_balancing, bool finished_with_client_failed_to_send, bool finished_known_received, grpc_grpclb_client_stats* client_stats) { gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1); - if (finished_with_drop_for_rate_limiting) { - gpr_atm_full_fetch_add( - &client_stats->num_calls_finished_with_drop_for_rate_limiting, - (gpr_atm)1); - } - if (finished_with_drop_for_load_balancing) { - gpr_atm_full_fetch_add( - &client_stats->num_calls_finished_with_drop_for_load_balancing, - (gpr_atm)1); - } if (finished_with_client_failed_to_send) { gpr_atm_full_fetch_add( &client_stats->num_calls_finished_with_client_failed_to_send, @@ -87,32 +80,70 @@ void grpc_grpclb_client_stats_add_call_finished( } } +void grpc_grpclb_client_stats_add_call_dropped_locked( + char* token, grpc_grpclb_client_stats* client_stats) { + // Increment num_calls_started and num_calls_finished. + gpr_atm_full_fetch_add(&client_stats->num_calls_started, (gpr_atm)1); + gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1); + // Record the drop. + if (client_stats->drop_token_counts == NULL) { + client_stats->drop_token_counts = + gpr_zalloc(sizeof(grpc_grpclb_dropped_call_counts)); + } + grpc_grpclb_dropped_call_counts* drop_token_counts = + client_stats->drop_token_counts; + for (size_t i = 0; i < drop_token_counts->num_entries; ++i) { + if (strcmp(drop_token_counts->token_counts[i].token, token) == 0) { + ++drop_token_counts->token_counts[i].count; + return; + } + } + // Not found, so add a new entry. We double the size of the array each time. + size_t new_num_entries = 2; + while (new_num_entries < drop_token_counts->num_entries + 1) { + new_num_entries *= 2; + } + drop_token_counts->token_counts = + gpr_realloc(drop_token_counts->token_counts, + new_num_entries * sizeof(grpc_grpclb_drop_token_count)); + grpc_grpclb_drop_token_count* new_entry = + &drop_token_counts->token_counts[drop_token_counts->num_entries++]; + new_entry->token = gpr_strdup(token); + new_entry->count = 1; +} + static void atomic_get_and_reset_counter(int64_t* value, gpr_atm* counter) { *value = (int64_t)gpr_atm_acq_load(counter); gpr_atm_full_fetch_add(counter, (gpr_atm)(-*value)); } -void grpc_grpclb_client_stats_get( +void grpc_grpclb_client_stats_get_locked( grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started, int64_t* num_calls_finished, - int64_t* num_calls_finished_with_drop_for_rate_limiting, - int64_t* num_calls_finished_with_drop_for_load_balancing, int64_t* num_calls_finished_with_client_failed_to_send, - int64_t* num_calls_finished_known_received) { + int64_t* num_calls_finished_known_received, + grpc_grpclb_dropped_call_counts** drop_token_counts) { atomic_get_and_reset_counter(num_calls_started, &client_stats->num_calls_started); atomic_get_and_reset_counter(num_calls_finished, &client_stats->num_calls_finished); atomic_get_and_reset_counter( - num_calls_finished_with_drop_for_rate_limiting, - &client_stats->num_calls_finished_with_drop_for_rate_limiting); - atomic_get_and_reset_counter( - num_calls_finished_with_drop_for_load_balancing, - &client_stats->num_calls_finished_with_drop_for_load_balancing); - atomic_get_and_reset_counter( num_calls_finished_with_client_failed_to_send, &client_stats->num_calls_finished_with_client_failed_to_send); atomic_get_and_reset_counter( num_calls_finished_known_received, &client_stats->num_calls_finished_known_received); + *drop_token_counts = client_stats->drop_token_counts; + client_stats->drop_token_counts = NULL; +} + +void grpc_grpclb_dropped_call_counts_destroy( + grpc_grpclb_dropped_call_counts* drop_entries) { + if (drop_entries != NULL) { + for (size_t i = 0; i < drop_entries->num_entries; ++i) { + gpr_free(drop_entries->token_counts[i].token); + } + gpr_free(drop_entries->token_counts); + gpr_free(drop_entries); + } } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h index 4bb47d5c5c..c51e2a431a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h @@ -25,6 +25,16 @@ typedef struct grpc_grpclb_client_stats grpc_grpclb_client_stats; +typedef struct { + char* token; + int64_t count; +} grpc_grpclb_drop_token_count; + +typedef struct { + grpc_grpclb_drop_token_count* token_counts; + size_t num_entries; +} grpc_grpclb_dropped_call_counts; + grpc_grpclb_client_stats* grpc_grpclb_client_stats_create(); grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref( grpc_grpclb_client_stats* client_stats); @@ -33,18 +43,23 @@ void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats); void grpc_grpclb_client_stats_add_call_started( grpc_grpclb_client_stats* client_stats); void grpc_grpclb_client_stats_add_call_finished( - bool finished_with_drop_for_rate_limiting, - bool finished_with_drop_for_load_balancing, bool finished_with_client_failed_to_send, bool finished_known_received, grpc_grpclb_client_stats* client_stats); -void grpc_grpclb_client_stats_get( +// This method is not thread-safe; caller must synchronize. +void grpc_grpclb_client_stats_add_call_dropped_locked( + char* token, grpc_grpclb_client_stats* client_stats); + +// This method is not thread-safe; caller must synchronize. +void grpc_grpclb_client_stats_get_locked( grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started, int64_t* num_calls_finished, - int64_t* num_calls_finished_with_drop_for_rate_limiting, - int64_t* num_calls_finished_with_drop_for_load_balancing, int64_t* num_calls_finished_with_client_failed_to_send, - int64_t* num_calls_finished_known_received); + int64_t* num_calls_finished_known_received, + grpc_grpclb_dropped_call_counts** drop_token_counts); + +void grpc_grpclb_dropped_call_counts_destroy( + grpc_grpclb_dropped_call_counts* drop_entries); #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H \ */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c index bec7c97a78..6fa29f326e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c @@ -76,7 +76,33 @@ static void populate_timestamp(gpr_timespec timestamp, timestamp_pb->nanos = timestamp.tv_nsec; } -grpc_grpclb_request *grpc_grpclb_load_report_request_create( +static bool encode_string(pb_ostream_t *stream, const pb_field_t *field, + void *const *arg) { + char *str = *arg; + if (!pb_encode_tag_for_field(stream, field)) return false; + return pb_encode_string(stream, (uint8_t *)str, strlen(str)); +} + +static bool encode_drops(pb_ostream_t *stream, const pb_field_t *field, + void *const *arg) { + grpc_grpclb_dropped_call_counts *drop_entries = *arg; + if (drop_entries == NULL) return true; + for (size_t i = 0; i < drop_entries->num_entries; ++i) { + if (!pb_encode_tag_for_field(stream, field)) return false; + grpc_lb_v1_ClientStatsPerToken drop_message; + drop_message.load_balance_token.funcs.encode = encode_string; + drop_message.load_balance_token.arg = drop_entries->token_counts[i].token; + drop_message.has_num_calls = true; + drop_message.num_calls = drop_entries->token_counts[i].count; + if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields, + &drop_message)) { + return false; + } + } + return true; +} + +grpc_grpclb_request *grpc_grpclb_load_report_request_create_locked( grpc_grpclb_client_stats *client_stats) { grpc_grpclb_request *req = gpr_zalloc(sizeof(grpc_grpclb_request)); req->has_client_stats = true; @@ -84,18 +110,17 @@ grpc_grpclb_request *grpc_grpclb_load_report_request_create( populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp); req->client_stats.has_num_calls_started = true; req->client_stats.has_num_calls_finished = true; - req->client_stats.has_num_calls_finished_with_drop_for_rate_limiting = true; - req->client_stats.has_num_calls_finished_with_drop_for_load_balancing = true; req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; req->client_stats.has_num_calls_finished_known_received = true; - grpc_grpclb_client_stats_get( + req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops; + grpc_grpclb_client_stats_get_locked( client_stats, &req->client_stats.num_calls_started, &req->client_stats.num_calls_finished, - &req->client_stats.num_calls_finished_with_drop_for_rate_limiting, - &req->client_stats.num_calls_finished_with_drop_for_load_balancing, &req->client_stats.num_calls_finished_with_client_failed_to_send, - &req->client_stats.num_calls_finished_known_received); + &req->client_stats.num_calls_finished_known_received, + (grpc_grpclb_dropped_call_counts **)&req->client_stats + .calls_finished_with_drop.arg); return req; } @@ -117,6 +142,11 @@ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) { } void grpc_grpclb_request_destroy(grpc_grpclb_request *request) { + if (request->has_client_stats) { + grpc_grpclb_dropped_call_counts *drop_entries = + request->client_stats.calls_finished_with_drop.arg; + grpc_grpclb_dropped_call_counts_destroy(drop_entries); + } gpr_free(request); } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h index ef8d563edc..c4a98492c9 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h @@ -44,7 +44,7 @@ typedef struct { /** Create a request for a gRPC LB service under \a lb_service_name */ grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name); -grpc_grpclb_request *grpc_grpclb_load_report_request_create( +grpc_grpclb_request *grpc_grpclb_load_report_request_create_locked( grpc_grpclb_client_stats *client_stats); /** Protocol Buffers v3-encode \a request */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c index fb119c7fc8..6a5d54c82a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c @@ -33,14 +33,19 @@ const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2] = { PB_LAST_FIELD }; -const pb_field_t grpc_lb_v1_ClientStats_fields[8] = { +const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3] = { + PB_FIELD( 1, STRING , OPTIONAL, CALLBACK, FIRST, grpc_lb_v1_ClientStatsPerToken, load_balance_token, load_balance_token, 0), + PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStatsPerToken, num_calls, load_balance_token, 0), + PB_LAST_FIELD +}; + +const pb_field_t grpc_lb_v1_ClientStats_fields[7] = { PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v1_ClientStats, timestamp, timestamp, &grpc_lb_v1_Timestamp_fields), PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_started, timestamp, 0), PB_FIELD( 3, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished, num_calls_started, 0), - PB_FIELD( 4, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_rate_limiting, num_calls_finished, 0), - PB_FIELD( 5, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_load_balancing, num_calls_finished_with_drop_for_rate_limiting, 0), - PB_FIELD( 6, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished_with_drop_for_load_balancing, 0), + PB_FIELD( 6, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished, 0), PB_FIELD( 7, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_known_received, num_calls_finished_with_client_failed_to_send, 0), + PB_FIELD( 8, MESSAGE , REPEATED, CALLBACK, OTHER, grpc_lb_v1_ClientStats, calls_finished_with_drop, num_calls_finished_known_received, &grpc_lb_v1_ClientStatsPerToken_fields), PB_LAST_FIELD }; @@ -62,12 +67,11 @@ const pb_field_t grpc_lb_v1_ServerList_fields[3] = { PB_LAST_FIELD }; -const pb_field_t grpc_lb_v1_Server_fields[6] = { +const pb_field_t grpc_lb_v1_Server_fields[5] = { PB_FIELD( 1, BYTES , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0), PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, port, ip_address, 0), PB_FIELD( 3, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0), - PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_rate_limiting, load_balance_token, 0), - PB_FIELD( 5, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_load_balancing, drop_for_rate_limiting, 0), + PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop, load_balance_token, 0), PB_LAST_FIELD }; @@ -81,7 +85,7 @@ const pb_field_t grpc_lb_v1_Server_fields[6] = { * numbers or field sizes that are larger than what can fit in 8 or 16 bit * field descriptors. */ -PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server) +PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server) #endif #if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT) @@ -92,7 +96,7 @@ PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) * numbers or field sizes that are larger than what can fit in the default * 8 bit descriptors. */ -PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server) +PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server) #endif diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h index d3ae919ec2..93333d1aed 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h @@ -14,6 +14,13 @@ extern "C" { #endif /* Struct definitions */ +typedef struct _grpc_lb_v1_ClientStatsPerToken { + pb_callback_t load_balance_token; + bool has_num_calls; + int64_t num_calls; +/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStatsPerToken) */ +} grpc_lb_v1_ClientStatsPerToken; + typedef struct _grpc_lb_v1_Duration { bool has_seconds; int64_t seconds; @@ -36,10 +43,8 @@ typedef struct _grpc_lb_v1_Server { int32_t port; bool has_load_balance_token; char load_balance_token[50]; - bool has_drop_for_rate_limiting; - bool drop_for_rate_limiting; - bool has_drop_for_load_balancing; - bool drop_for_load_balancing; + bool has_drop; + bool drop; /* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */ } grpc_lb_v1_Server; @@ -58,14 +63,11 @@ typedef struct _grpc_lb_v1_ClientStats { int64_t num_calls_started; bool has_num_calls_finished; int64_t num_calls_finished; - bool has_num_calls_finished_with_drop_for_rate_limiting; - int64_t num_calls_finished_with_drop_for_rate_limiting; - bool has_num_calls_finished_with_drop_for_load_balancing; - int64_t num_calls_finished_with_drop_for_load_balancing; bool has_num_calls_finished_with_client_failed_to_send; int64_t num_calls_finished_with_client_failed_to_send; bool has_num_calls_finished_known_received; int64_t num_calls_finished_known_received; + pb_callback_t calls_finished_with_drop; /* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */ } grpc_lb_v1_ClientStats; @@ -107,39 +109,41 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse { #define grpc_lb_v1_Timestamp_init_default {false, 0, false, 0} #define grpc_lb_v1_LoadBalanceRequest_init_default {false, grpc_lb_v1_InitialLoadBalanceRequest_init_default, false, grpc_lb_v1_ClientStats_init_default} #define grpc_lb_v1_InitialLoadBalanceRequest_init_default {false, ""} -#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0} +#define grpc_lb_v1_ClientStatsPerToken_init_default {{{NULL}, NULL}, false, 0} +#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}} #define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default} #define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default} #define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default} -#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0, false, 0} +#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0} #define grpc_lb_v1_Duration_init_zero {false, 0, false, 0} #define grpc_lb_v1_Timestamp_init_zero {false, 0, false, 0} #define grpc_lb_v1_LoadBalanceRequest_init_zero {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero} #define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""} -#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0} +#define grpc_lb_v1_ClientStatsPerToken_init_zero {{{NULL}, NULL}, false, 0} +#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}} #define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero} #define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero} #define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero} -#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0, false, 0} +#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0} /* Field tags (for use in manual encoding/decoding) */ +#define grpc_lb_v1_ClientStatsPerToken_load_balance_token_tag 1 +#define grpc_lb_v1_ClientStatsPerToken_num_calls_tag 2 #define grpc_lb_v1_Duration_seconds_tag 1 #define grpc_lb_v1_Duration_nanos_tag 2 #define grpc_lb_v1_InitialLoadBalanceRequest_name_tag 1 #define grpc_lb_v1_Server_ip_address_tag 1 #define grpc_lb_v1_Server_port_tag 2 #define grpc_lb_v1_Server_load_balance_token_tag 3 -#define grpc_lb_v1_Server_drop_for_rate_limiting_tag 4 -#define grpc_lb_v1_Server_drop_for_load_balancing_tag 5 +#define grpc_lb_v1_Server_drop_tag 4 #define grpc_lb_v1_Timestamp_seconds_tag 1 #define grpc_lb_v1_Timestamp_nanos_tag 2 #define grpc_lb_v1_ClientStats_timestamp_tag 1 #define grpc_lb_v1_ClientStats_num_calls_started_tag 2 #define grpc_lb_v1_ClientStats_num_calls_finished_tag 3 -#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_rate_limiting_tag 4 -#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_load_balancing_tag 5 #define grpc_lb_v1_ClientStats_num_calls_finished_with_client_failed_to_send_tag 6 #define grpc_lb_v1_ClientStats_num_calls_finished_known_received_tag 7 +#define grpc_lb_v1_ClientStats_calls_finished_with_drop_tag 8 #define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1 #define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 2 #define grpc_lb_v1_ServerList_servers_tag 1 @@ -154,22 +158,24 @@ extern const pb_field_t grpc_lb_v1_Duration_fields[3]; extern const pb_field_t grpc_lb_v1_Timestamp_fields[3]; extern const pb_field_t grpc_lb_v1_LoadBalanceRequest_fields[3]; extern const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2]; -extern const pb_field_t grpc_lb_v1_ClientStats_fields[8]; +extern const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3]; +extern const pb_field_t grpc_lb_v1_ClientStats_fields[7]; extern const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3]; extern const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3]; extern const pb_field_t grpc_lb_v1_ServerList_fields[3]; -extern const pb_field_t grpc_lb_v1_Server_fields[6]; +extern const pb_field_t grpc_lb_v1_Server_fields[5]; /* Maximum encoded size of messages (where known) */ #define grpc_lb_v1_Duration_size 22 #define grpc_lb_v1_Timestamp_size 22 -#define grpc_lb_v1_LoadBalanceRequest_size 226 +#define grpc_lb_v1_LoadBalanceRequest_size (140 + grpc_lb_v1_ClientStats_size) #define grpc_lb_v1_InitialLoadBalanceRequest_size 131 -#define grpc_lb_v1_ClientStats_size 90 +/* grpc_lb_v1_ClientStatsPerToken_size depends on runtime parameters */ +/* grpc_lb_v1_ClientStats_size depends on runtime parameters */ #define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size) #define grpc_lb_v1_InitialLoadBalanceResponse_size 90 /* grpc_lb_v1_ServerList_size depends on runtime parameters */ -#define grpc_lb_v1_Server_size 85 +#define grpc_lb_v1_Server_size 83 /* Message IDs (where set with "msgid" option) */ #ifdef PB_MSGID diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index bc40165cfb..a7f7e9542c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -74,6 +74,9 @@ typedef struct round_robin_lb_policy { bool started_picking; /** are we shutting down? */ bool shutdown; + /** has the policy gotten into the GRPC_CHANNEL_SHUTDOWN? No picks can be + * service after this point, the policy will never transition out. */ + bool in_connectivity_shutdown; /** List of picks that are waiting on connectivity */ pending_pick *pending_picks; @@ -420,6 +423,8 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + GPR_ASSERT(!p->shutdown); + GPR_ASSERT(!p->in_connectivity_shutdown); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "[RR %p] Trying to pick", (void *)pol); } @@ -532,6 +537,7 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); + p->in_connectivity_shutdown = true; new_state = GRPC_CHANNEL_SHUTDOWN; } else if (subchannel_list->num_transient_failures == p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c index 9065e33613..6ec3790a5f 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c @@ -33,13 +33,13 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> -#include <nameser.h> #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/nameser.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/support/string.h" diff --git a/src/core/ext/filters/client_channel/retry_throttle.c b/src/core/ext/filters/client_channel/retry_throttle.c index 3009e21d49..0c7a3ae651 100644 --- a/src/core/ext/filters/client_channel/retry_throttle.c +++ b/src/core/ext/filters/client_channel/retry_throttle.c @@ -130,24 +130,28 @@ static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create( // avl vtable for string -> server_retry_throttle_data map // -static void* copy_server_name(void* key) { return gpr_strdup(key); } +static void* copy_server_name(void* key, void* unused) { + return gpr_strdup(key); +} -static long compare_server_name(void* key1, void* key2) { +static long compare_server_name(void* key1, void* key2, void* unused) { return strcmp(key1, key2); } -static void destroy_server_retry_throttle_data(void* value) { +static void destroy_server_retry_throttle_data(void* value, void* unused) { grpc_server_retry_throttle_data* throttle_data = value; grpc_server_retry_throttle_data_unref(throttle_data); } -static void* copy_server_retry_throttle_data(void* value) { +static void* copy_server_retry_throttle_data(void* value, void* unused) { grpc_server_retry_throttle_data* throttle_data = value; return grpc_server_retry_throttle_data_ref(throttle_data); } +static void destroy_server_name(void* key, void* unused) { gpr_free(key); } + static const gpr_avl_vtable avl_vtable = { - gpr_free /* destroy_key */, copy_server_name, compare_server_name, + destroy_server_name, copy_server_name, compare_server_name, destroy_server_retry_throttle_data, copy_server_retry_throttle_data}; // @@ -164,19 +168,19 @@ void grpc_retry_throttle_map_init() { void grpc_retry_throttle_map_shutdown() { gpr_mu_destroy(&g_mu); - gpr_avl_unref(g_avl); + gpr_avl_unref(g_avl, NULL); } grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( const char* server_name, int max_milli_tokens, int milli_token_ratio) { gpr_mu_lock(&g_mu); grpc_server_retry_throttle_data* throttle_data = - gpr_avl_get(g_avl, (char*)server_name); + gpr_avl_get(g_avl, (char*)server_name, NULL); if (throttle_data == NULL) { // Entry not found. Create a new one. throttle_data = grpc_server_retry_throttle_data_create( max_milli_tokens, milli_token_ratio, NULL); - g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data); + g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data, NULL); } else { if (throttle_data->max_milli_tokens != max_milli_tokens || throttle_data->milli_token_ratio != milli_token_ratio) { @@ -184,7 +188,7 @@ grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( // the original one. throttle_data = grpc_server_retry_throttle_data_create( max_milli_tokens, milli_token_ratio, throttle_data); - g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data); + g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data, NULL); } else { // Entry found. Increase refcount. grpc_server_retry_throttle_data_ref(throttle_data); diff --git a/src/core/ext/filters/client_channel/subchannel_index.c b/src/core/ext/filters/client_channel/subchannel_index.c index a33ab950bf..ababd05d84 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.c +++ b/src/core/ext/filters/client_channel/subchannel_index.c @@ -38,26 +38,8 @@ struct grpc_subchannel_key { grpc_subchannel_args args; }; -GPR_TLS_DECL(subchannel_index_exec_ctx); - static bool g_force_creation = false; -static void enter_ctx(grpc_exec_ctx *exec_ctx) { - GPR_ASSERT(gpr_tls_get(&subchannel_index_exec_ctx) == 0); - gpr_tls_set(&subchannel_index_exec_ctx, (intptr_t)exec_ctx); -} - -static void leave_ctx(grpc_exec_ctx *exec_ctx) { - GPR_ASSERT(gpr_tls_get(&subchannel_index_exec_ctx) == (intptr_t)exec_ctx); - gpr_tls_set(&subchannel_index_exec_ctx, 0); -} - -static grpc_exec_ctx *current_ctx() { - grpc_exec_ctx *c = (grpc_exec_ctx *)gpr_tls_get(&subchannel_index_exec_ctx); - GPR_ASSERT(c != NULL); - return c; -} - static grpc_subchannel_key *create_key( const grpc_subchannel_args *args, grpc_channel_args *(*copy_channel_args)(const grpc_channel_args *args)) { @@ -104,21 +86,25 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, gpr_free(k); } -static void sck_avl_destroy(void *p) { - grpc_subchannel_key_destroy(current_ctx(), p); +static void sck_avl_destroy(void *p, void *user_data) { + grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data; + grpc_subchannel_key_destroy(exec_ctx, p); } -static void *sck_avl_copy(void *p) { return subchannel_key_copy(p); } +static void *sck_avl_copy(void *p, void *unused) { + return subchannel_key_copy(p); +} -static long sck_avl_compare(void *a, void *b) { +static long sck_avl_compare(void *a, void *b, void *unused) { return grpc_subchannel_key_compare(a, b); } -static void scv_avl_destroy(void *p) { - GRPC_SUBCHANNEL_WEAK_UNREF(current_ctx(), p, "subchannel_index"); +static void scv_avl_destroy(void *p, void *user_data) { + grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data; + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, p, "subchannel_index"); } -static void *scv_avl_copy(void *p) { +static void *scv_avl_copy(void *p, void *unused) { GRPC_SUBCHANNEL_WEAK_REF(p, "subchannel_index"); return p; } @@ -133,38 +119,33 @@ static const gpr_avl_vtable subchannel_avl_vtable = { void grpc_subchannel_index_init(void) { g_subchannel_index = gpr_avl_create(&subchannel_avl_vtable); gpr_mu_init(&g_mu); - gpr_tls_init(&subchannel_index_exec_ctx); } void grpc_subchannel_index_shutdown(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_mu_destroy(&g_mu); - gpr_avl_unref(g_subchannel_index); - gpr_tls_destroy(&subchannel_index_exec_ctx); + gpr_avl_unref(g_subchannel_index, &exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); } grpc_subchannel *grpc_subchannel_index_find(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key) { - enter_ctx(exec_ctx); - // Lock, and take a reference to the subchannel index. // We don't need to do the search under a lock as avl's are immutable. gpr_mu_lock(&g_mu); - gpr_avl index = gpr_avl_ref(g_subchannel_index); + gpr_avl index = gpr_avl_ref(g_subchannel_index, exec_ctx); gpr_mu_unlock(&g_mu); - grpc_subchannel *c = - GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(gpr_avl_get(index, key), "index_find"); - gpr_avl_unref(index); + grpc_subchannel *c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF( + gpr_avl_get(index, key, exec_ctx), "index_find"); + gpr_avl_unref(index, exec_ctx); - leave_ctx(exec_ctx); return c; } grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key, grpc_subchannel *constructed) { - enter_ctx(exec_ctx); - grpc_subchannel *c = NULL; bool need_to_unref_constructed; @@ -174,11 +155,11 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, // Compare and swap loop: // - take a reference to the current index gpr_mu_lock(&g_mu); - gpr_avl index = gpr_avl_ref(g_subchannel_index); + gpr_avl index = gpr_avl_ref(g_subchannel_index, exec_ctx); gpr_mu_unlock(&g_mu); // - Check to see if a subchannel already exists - c = gpr_avl_get(index, key); + c = gpr_avl_get(index, key, exec_ctx); if (c != NULL) { c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "index_register"); } @@ -187,9 +168,9 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, need_to_unref_constructed = true; } else { // no -> update the avl and compare/swap - gpr_avl updated = - gpr_avl_add(gpr_avl_ref(index), subchannel_key_copy(key), - GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register")); + gpr_avl updated = gpr_avl_add( + gpr_avl_ref(index, exec_ctx), subchannel_key_copy(key), + GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"), exec_ctx); // it may happen (but it's expected to be unlikely) // that some other thread has changed the index: @@ -201,13 +182,11 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, } gpr_mu_unlock(&g_mu); - gpr_avl_unref(updated); + gpr_avl_unref(updated, exec_ctx); } - gpr_avl_unref(index); + gpr_avl_unref(index, exec_ctx); } - leave_ctx(exec_ctx); - if (need_to_unref_constructed) { GRPC_SUBCHANNEL_UNREF(exec_ctx, constructed, "index_register"); } @@ -218,27 +197,26 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key, grpc_subchannel *constructed) { - enter_ctx(exec_ctx); - bool done = false; while (!done) { // Compare and swap loop: // - take a reference to the current index gpr_mu_lock(&g_mu); - gpr_avl index = gpr_avl_ref(g_subchannel_index); + gpr_avl index = gpr_avl_ref(g_subchannel_index, exec_ctx); gpr_mu_unlock(&g_mu); // Check to see if this key still refers to the previously // registered subchannel - grpc_subchannel *c = gpr_avl_get(index, key); + grpc_subchannel *c = gpr_avl_get(index, key, exec_ctx); if (c != constructed) { - gpr_avl_unref(index); + gpr_avl_unref(index, exec_ctx); break; } // compare and swap the update (some other thread may have // mutated the index behind us) - gpr_avl updated = gpr_avl_remove(gpr_avl_ref(index), key); + gpr_avl updated = + gpr_avl_remove(gpr_avl_ref(index, exec_ctx), key, exec_ctx); gpr_mu_lock(&g_mu); if (index.root == g_subchannel_index.root) { @@ -247,11 +225,9 @@ void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx, } gpr_mu_unlock(&g_mu); - gpr_avl_unref(updated); - gpr_avl_unref(index); + gpr_avl_unref(updated, exec_ctx); + gpr_avl_unref(index, exec_ctx); } - - leave_ctx(exec_ctx); } void grpc_subchannel_index_test_only_set_force_creation(bool force_creation) { diff --git a/src/core/ext/filters/http/client/http_client_filter.c b/src/core/ext/filters/http/client/http_client_filter.c index 90f0aed7a0..3ca01a41b5 100644 --- a/src/core/ext/filters/http/client/http_client_filter.c +++ b/src/core/ext/filters/http/client/http_client_filter.c @@ -36,41 +36,29 @@ static const size_t kMaxPayloadSizeForGet = 2048; typedef struct call_data { + // State for handling send_initial_metadata ops. grpc_linked_mdelem method; grpc_linked_mdelem scheme; grpc_linked_mdelem authority; grpc_linked_mdelem te_trailers; grpc_linked_mdelem content_type; grpc_linked_mdelem user_agent; - + // State for handling recv_initial_metadata ops. grpc_metadata_batch *recv_initial_metadata; + grpc_closure *original_recv_initial_metadata_ready; + grpc_closure recv_initial_metadata_ready; + // State for handling recv_trailing_metadata ops. grpc_metadata_batch *recv_trailing_metadata; - uint8_t *payload_bytes; - - /* Vars to read data off of send_message */ - grpc_transport_stream_op_batch *send_op; - uint32_t send_length; - uint32_t send_flags; - grpc_slice incoming_slice; - grpc_slice_buffer_stream replacement_stream; - grpc_slice_buffer slices; - /* flag that indicates that all slices of send_messages aren't availble */ - bool send_message_blocked; - - /** Closure to call when finished with the hc_on_recv hook */ - grpc_closure *on_done_recv_initial_metadata; - grpc_closure *on_done_recv_trailing_metadata; - grpc_closure *on_complete; - grpc_closure *post_send; - - /** Receive closures are chained: we inject this closure as the on_done_recv - up-call on transport_op, and remember to call our on_done_recv member - after handling it. */ - grpc_closure hc_on_recv_initial_metadata; - grpc_closure hc_on_recv_trailing_metadata; - grpc_closure hc_on_complete; - grpc_closure got_slice; - grpc_closure send_done; + grpc_closure *original_recv_trailing_metadata_on_complete; + grpc_closure recv_trailing_metadata_on_complete; + // State for handling send_message ops. + grpc_transport_stream_op_batch *send_message_batch; + size_t send_message_bytes_read; + grpc_byte_stream_cache send_message_cache; + grpc_caching_byte_stream send_message_caching_stream; + grpc_closure on_send_message_next_done; + grpc_closure *original_send_message_on_complete; + grpc_closure send_message_on_complete; } call_data; typedef struct channel_data { @@ -148,7 +136,7 @@ static grpc_error *client_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } -static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, +static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *user_data, grpc_error *error) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; @@ -158,11 +146,13 @@ static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, } else { GRPC_ERROR_REF(error); } - GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_initial_metadata, error); + GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready, + error); } -static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, - void *user_data, grpc_error *error) { +static void recv_trailing_metadata_on_complete(grpc_exec_ctx *exec_ctx, + void *user_data, + grpc_error *error) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; if (error == GRPC_ERROR_NONE) { @@ -171,25 +161,131 @@ static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, } else { GRPC_ERROR_REF(error); } - GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_trailing_metadata, error); + GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_trailing_metadata_on_complete, + error); } -static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_error *error) { - grpc_call_element *elem = user_data; - call_data *calld = elem->call_data; - if (calld->payload_bytes) { - gpr_free(calld->payload_bytes); - calld->payload_bytes = NULL; +static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = (grpc_call_element *)arg; + call_data *calld = (call_data *)elem->call_data; + grpc_byte_stream_cache_destroy(exec_ctx, &calld->send_message_cache); + GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete, + GRPC_ERROR_REF(error)); +} + +// Pulls a slice from the send_message byte stream, updating +// calld->send_message_bytes_read. +static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx, + call_data *calld) { + grpc_slice incoming_slice; + grpc_error *error = grpc_byte_stream_pull( + exec_ctx, &calld->send_message_caching_stream.base, &incoming_slice); + if (error == GRPC_ERROR_NONE) { + calld->send_message_bytes_read += GRPC_SLICE_LENGTH(incoming_slice); + grpc_slice_unref_internal(exec_ctx, incoming_slice); } - calld->on_complete->cb(exec_ctx, calld->on_complete->cb_arg, error); + return error; } -static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { - grpc_call_element *elem = elemp; - call_data *calld = elem->call_data; - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices); - calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); +// Reads as many slices as possible from the send_message byte stream. +// Upon successful return, if calld->send_message_bytes_read == +// calld->send_message_caching_stream.base.length, then we have completed +// reading from the byte stream; otherwise, an async read has been dispatched +// and on_send_message_next_done() will be invoked when it is complete. +static grpc_error *read_all_available_send_message_data(grpc_exec_ctx *exec_ctx, + call_data *calld) { + while (grpc_byte_stream_next(exec_ctx, + &calld->send_message_caching_stream.base, + ~(size_t)0, &calld->on_send_message_next_done)) { + grpc_error *error = pull_slice_from_send_message(exec_ctx, calld); + if (error != GRPC_ERROR_NONE) return error; + if (calld->send_message_bytes_read == + calld->send_message_caching_stream.base.length) { + break; + } + } + return GRPC_ERROR_NONE; +} + +// Async callback for grpc_byte_stream_next(). +static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = (grpc_call_element *)arg; + call_data *calld = (call_data *)elem->call_data; + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); + return; + } + error = pull_slice_from_send_message(exec_ctx, calld); + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); + return; + } + // There may or may not be more to read, but we don't care. If we got + // here, then we know that all of the data was not available + // synchronously, so we were not able to do a cached call. Instead, + // we just reset the byte stream and then send down the batch as-is. + grpc_caching_byte_stream_reset(&calld->send_message_caching_stream); + grpc_call_next_op(exec_ctx, elem, calld->send_message_batch); +} + +static char *slice_buffer_to_string(grpc_slice_buffer *slice_buffer) { + char *payload_bytes = gpr_malloc(slice_buffer->length + 1); + size_t offset = 0; + for (size_t i = 0; i < slice_buffer->count; ++i) { + memcpy(payload_bytes + offset, + GRPC_SLICE_START_PTR(slice_buffer->slices[i]), + GRPC_SLICE_LENGTH(slice_buffer->slices[i])); + offset += GRPC_SLICE_LENGTH(slice_buffer->slices[i]); + } + *(payload_bytes + offset) = '\0'; + return payload_bytes; +} + +// Modifies the path entry in the batch's send_initial_metadata to +// append the base64-encoded query for a GET request. +static grpc_error *update_path_for_get(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op_batch *batch) { + call_data *calld = (call_data *)elem->call_data; + grpc_slice path_slice = + GRPC_MDVALUE(batch->payload->send_initial_metadata.send_initial_metadata + ->idx.named.path->md); + /* sum up individual component's lengths and allocate enough memory to + * hold combined path+query */ + size_t estimated_len = GRPC_SLICE_LENGTH(path_slice); + estimated_len++; /* for the '?' */ + estimated_len += grpc_base64_estimate_encoded_size( + batch->payload->send_message.send_message->length, true /* url_safe */, + false /* multi_line */); + grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len); + /* memcopy individual pieces into this slice */ + char *write_ptr = (char *)GRPC_SLICE_START_PTR(path_with_query_slice); + char *original_path = (char *)GRPC_SLICE_START_PTR(path_slice); + memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice)); + write_ptr += GRPC_SLICE_LENGTH(path_slice); + *write_ptr++ = '?'; + char *payload_bytes = + slice_buffer_to_string(&calld->send_message_cache.cache_buffer); + grpc_base64_encode_core((char *)write_ptr, payload_bytes, + batch->payload->send_message.send_message->length, + true /* url_safe */, false /* multi_line */); + gpr_free(payload_bytes); + /* remove trailing unused memory and add trailing 0 to terminate string */ + char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice); + /* safe to use strlen since base64_encode will always add '\0' */ + path_with_query_slice = + grpc_slice_sub_no_ref(path_with_query_slice, 0, strlen(t)); + /* substitute previous path with the new path+query */ + grpc_mdelem mdelem_path_and_query = + grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice); + grpc_metadata_batch *b = + batch->payload->send_initial_metadata.send_initial_metadata; + return grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path, + mdelem_path_and_query); } static void remove_if_present(grpc_exec_ctx *exec_ctx, @@ -200,273 +296,153 @@ static void remove_if_present(grpc_exec_ctx *exec_ctx, } } -static void continue_send_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void hc_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; - uint8_t *wrptr = calld->payload_bytes; - while (grpc_byte_stream_next( - exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0, - &calld->got_slice)) { - grpc_byte_stream_pull(exec_ctx, - calld->send_op->payload->send_message.send_message, - &calld->incoming_slice); - if (GRPC_SLICE_LENGTH(calld->incoming_slice) > 0) { - memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice), - GRPC_SLICE_LENGTH(calld->incoming_slice)); - } - wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice); - grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); - if (calld->send_length == calld->slices.length) { - calld->send_message_blocked = false; - break; - } - } -} + channel_data *channeld = elem->channel_data; + GPR_TIMER_BEGIN("hc_start_transport_stream_op_batch", 0); + GRPC_CALL_LOG_OP(GPR_INFO, elem, batch); -static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { - grpc_call_element *elem = elemp; - call_data *calld = elem->call_data; - calld->send_message_blocked = false; - if (GRPC_ERROR_NONE != - grpc_byte_stream_pull(exec_ctx, - calld->send_op->payload->send_message.send_message, - &calld->incoming_slice)) { - /* Should never reach here */ - abort(); - } - grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); - if (calld->send_length == calld->slices.length) { - /* Pass down the original send_message op that was blocked.*/ - grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, - calld->send_flags); - calld->send_op->payload->send_message.send_message = - &calld->replacement_stream.base; - calld->post_send = calld->send_op->on_complete; - calld->send_op->on_complete = &calld->send_done; - grpc_call_next_op(exec_ctx, elem, calld->send_op); - } else { - continue_send_message(exec_ctx, elem); + if (batch->recv_initial_metadata) { + /* substitute our callback for the higher callback */ + calld->recv_initial_metadata = + batch->payload->recv_initial_metadata.recv_initial_metadata; + calld->original_recv_initial_metadata_ready = + batch->payload->recv_initial_metadata.recv_initial_metadata_ready; + batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->recv_initial_metadata_ready; } -} -static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - grpc_error *error; + if (batch->recv_trailing_metadata) { + /* substitute our callback for the higher callback */ + calld->recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata; + calld->original_recv_trailing_metadata_on_complete = batch->on_complete; + batch->on_complete = &calld->recv_trailing_metadata_on_complete; + } - if (op->send_initial_metadata) { - /* Decide which HTTP VERB to use. We use GET if the request is marked - cacheable, and the operation contains both initial metadata and send - message, and the payload is below the size threshold, and all the data - for this request is immediately available. */ + grpc_error *error = GRPC_ERROR_NONE; + bool batch_will_be_handled_asynchronously = false; + if (batch->send_initial_metadata) { + // Decide which HTTP VERB to use. We use GET if the request is marked + // cacheable, and the operation contains both initial metadata and send + // message, and the payload is below the size threshold, and all the data + // for this request is immediately available. grpc_mdelem method = GRPC_MDELEM_METHOD_POST; - if (op->send_message && - (op->payload->send_initial_metadata.send_initial_metadata_flags & + if (batch->send_message && + (batch->payload->send_initial_metadata.send_initial_metadata_flags & GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) && - op->payload->send_message.send_message->length < + batch->payload->send_message.send_message->length < channeld->max_payload_size_for_get) { - method = GRPC_MDELEM_METHOD_GET; - /* The following write to calld->send_message_blocked isn't racy with - reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because - being here means ops->send_message is not NULL, which is primarily - guarding the read there. */ - calld->send_message_blocked = true; - } else if (op->payload->send_initial_metadata.send_initial_metadata_flags & - GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { - method = GRPC_MDELEM_METHOD_PUT; - } - - /* Attempt to read the data from send_message and create a header field. */ - if (grpc_mdelem_eq(method, GRPC_MDELEM_METHOD_GET)) { - /* allocate memory to hold the entire payload */ - calld->payload_bytes = - gpr_malloc(op->payload->send_message.send_message->length); - - /* read slices of send_message and copy into payload_bytes */ - calld->send_op = op; - calld->send_length = op->payload->send_message.send_message->length; - calld->send_flags = op->payload->send_message.send_message->flags; - continue_send_message(exec_ctx, elem); - - if (calld->send_message_blocked == false) { - /* when all the send_message data is available, then modify the path - * MDELEM by appending base64 encoded query to the path */ - const int k_url_safe = 1; - const int k_multi_line = 0; - const unsigned char k_query_separator = '?'; - - grpc_slice path_slice = - GRPC_MDVALUE(op->payload->send_initial_metadata - .send_initial_metadata->idx.named.path->md); - /* sum up individual component's lengths and allocate enough memory to - * hold combined path+query */ - size_t estimated_len = GRPC_SLICE_LENGTH(path_slice); - estimated_len++; /* for the '?' */ - estimated_len += grpc_base64_estimate_encoded_size( - op->payload->send_message.send_message->length, k_url_safe, - k_multi_line); - grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len); - - /* memcopy individual pieces into this slice */ - uint8_t *write_ptr = - (uint8_t *)GRPC_SLICE_START_PTR(path_with_query_slice); - uint8_t *original_path = (uint8_t *)GRPC_SLICE_START_PTR(path_slice); - memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice)); - write_ptr += GRPC_SLICE_LENGTH(path_slice); - - *write_ptr = k_query_separator; - write_ptr++; /* for the '?' */ - - grpc_base64_encode_core((char *)write_ptr, calld->payload_bytes, - op->payload->send_message.send_message->length, - k_url_safe, k_multi_line); - - /* remove trailing unused memory and add trailing 0 to terminate string - */ - char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice); - /* safe to use strlen since base64_encode will always add '\0' */ - path_with_query_slice = - grpc_slice_sub_no_ref(path_with_query_slice, 0, strlen(t)); - - /* substitute previous path with the new path+query */ - grpc_mdelem mdelem_path_and_query = grpc_mdelem_from_slices( - exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice); - grpc_metadata_batch *b = - op->payload->send_initial_metadata.send_initial_metadata; - error = grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path, - mdelem_path_and_query); - if (error != GRPC_ERROR_NONE) return error; - - calld->on_complete = op->on_complete; - op->on_complete = &calld->hc_on_complete; - op->send_message = false; + calld->send_message_bytes_read = 0; + grpc_byte_stream_cache_init(&calld->send_message_cache, + batch->payload->send_message.send_message); + grpc_caching_byte_stream_init(&calld->send_message_caching_stream, + &calld->send_message_cache); + batch->payload->send_message.send_message = + &calld->send_message_caching_stream.base; + calld->original_send_message_on_complete = batch->on_complete; + batch->on_complete = &calld->send_message_on_complete; + calld->send_message_batch = batch; + error = read_all_available_send_message_data(exec_ctx, calld); + if (error != GRPC_ERROR_NONE) goto done; + // If all the data has been read, then we can use GET. + if (calld->send_message_bytes_read == + calld->send_message_caching_stream.base.length) { + method = GRPC_MDELEM_METHOD_GET; + error = update_path_for_get(exec_ctx, elem, batch); + if (error != GRPC_ERROR_NONE) goto done; + batch->send_message = false; + grpc_byte_stream_destroy(exec_ctx, + &calld->send_message_caching_stream.base); } else { - /* Not all data is available. Fall back to POST. */ + // Not all data is available. The batch will be sent down + // asynchronously in on_send_message_next_done(). + batch_will_be_handled_asynchronously = true; + // Fall back to POST. gpr_log(GPR_DEBUG, - "Request is marked Cacheable but not all data is available.\ - Falling back to POST"); - method = GRPC_MDELEM_METHOD_POST; + "Request is marked Cacheable but not all data is available. " + "Falling back to POST"); } + } else if (batch->payload->send_initial_metadata + .send_initial_metadata_flags & + GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { + method = GRPC_MDELEM_METHOD_PUT; } - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_METHOD); - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_SCHEME); - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_TE); - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_CONTENT_TYPE); - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_USER_AGENT); + remove_if_present( + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_METHOD); + remove_if_present( + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_SCHEME); + remove_if_present( + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_TE); + remove_if_present( + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_CONTENT_TYPE); + remove_if_present( + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_USER_AGENT); /* Send : prefixed headers, which have to be before any application layer headers. */ error = grpc_metadata_batch_add_head( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, &calld->method, method); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) goto done; error = grpc_metadata_batch_add_head( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, &calld->scheme, channeld->static_scheme); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) goto done; error = grpc_metadata_batch_add_tail( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, &calld->te_trailers, GRPC_MDELEM_TE_TRAILERS); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) goto done; error = grpc_metadata_batch_add_tail( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, &calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) goto done; error = grpc_metadata_batch_add_tail( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, &calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent)); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) goto done; } - if (op->recv_initial_metadata) { - /* substitute our callback for the higher callback */ - calld->recv_initial_metadata = - op->payload->recv_initial_metadata.recv_initial_metadata; - calld->on_done_recv_initial_metadata = - op->payload->recv_initial_metadata.recv_initial_metadata_ready; - op->payload->recv_initial_metadata.recv_initial_metadata_ready = - &calld->hc_on_recv_initial_metadata; - } - - if (op->recv_trailing_metadata) { - /* substitute our callback for the higher callback */ - calld->recv_trailing_metadata = - op->payload->recv_trailing_metadata.recv_trailing_metadata; - calld->on_done_recv_trailing_metadata = op->on_complete; - op->on_complete = &calld->hc_on_recv_trailing_metadata; - } - - return GRPC_ERROR_NONE; -} - -static void hc_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - GPR_TIMER_BEGIN("hc_start_transport_op", 0); - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_error *error = hc_mutate_op(exec_ctx, elem, op); +done: if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); - } else { - call_data *calld = elem->call_data; - if (op->send_message && calld->send_message_blocked) { - /* Don't forward the op. send_message contains slices that aren't ready - yet. The call will be forwarded by the op_complete of slice read call. - */ - } else { - grpc_call_next_op(exec_ctx, elem, op); - } + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); + } else if (!batch_will_be_handled_asynchronously) { + grpc_call_next_op(exec_ctx, elem, batch); } - GPR_TIMER_END("hc_start_transport_op", 0); + GPR_TIMER_END("hc_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { - call_data *calld = elem->call_data; - calld->on_done_recv_initial_metadata = NULL; - calld->on_done_recv_trailing_metadata = NULL; - calld->on_complete = NULL; - calld->payload_bytes = NULL; - calld->send_message_blocked = false; - grpc_slice_buffer_init(&calld->slices); - GRPC_CLOSURE_INIT(&calld->hc_on_recv_initial_metadata, - hc_on_recv_initial_metadata, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->hc_on_recv_trailing_metadata, - hc_on_recv_trailing_metadata, elem, + call_data *calld = (call_data *)elem->call_data; + GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->hc_on_complete, hc_on_complete, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->got_slice, got_slice, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->send_done, send_done, elem, + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_on_complete, + recv_trailing_metadata_on_complete, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete, + elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->on_send_message_next_done, + on_send_message_next_done, elem, grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - grpc_closure *ignored) { - call_data *calld = elem->call_data; - grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); -} + grpc_closure *ignored) {} static grpc_mdelem scheme_from_args(const grpc_channel_args *args) { unsigned i; @@ -580,7 +556,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } const grpc_channel_filter grpc_http_client_filter = { - hc_start_transport_op, + hc_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), init_call_elem, diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c index 71a8bc5bec..20a3488115 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.c +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c @@ -61,14 +61,11 @@ typedef struct call_data { pointer | CANCELLED_BIT - request was cancelled with error pointed to */ gpr_atm send_initial_metadata_state; - grpc_transport_stream_op_batch *send_op; - uint32_t send_length; - uint32_t send_flags; - grpc_slice incoming_slice; + grpc_transport_stream_op_batch *send_message_batch; grpc_slice_buffer_stream replacement_stream; - grpc_closure *post_send; - grpc_closure send_done; - grpc_closure got_slice; + grpc_closure *original_send_message_on_complete; + grpc_closure send_message_on_complete; + grpc_closure on_send_message_next_done; } call_data; typedef struct channel_data { @@ -164,24 +161,25 @@ static grpc_error *process_send_initial_metadata( return error; } -static void continue_send_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem); - -static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { - grpc_call_element *elem = elemp; - call_data *calld = elem->call_data; +static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = (grpc_call_element *)arg; + call_data *calld = (call_data *)elem->call_data; grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices); - calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); + GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete, + GRPC_ERROR_REF(error)); } static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - call_data *calld = elem->call_data; - int did_compress; + call_data *calld = (call_data *)elem->call_data; + // Compress the data if appropriate. grpc_slice_buffer tmp; grpc_slice_buffer_init(&tmp); - did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm, - &calld->slices, &tmp); + uint32_t send_flags = + calld->send_message_batch->payload->send_message.send_message->flags; + const bool did_compress = grpc_msg_compress( + exec_ctx, calld->compression_algorithm, &calld->slices, &tmp); if (did_compress) { if (GRPC_TRACER_ON(grpc_compression_trace)) { char *algo_name; @@ -195,7 +193,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, algo_name, before_size, after_size, 100 * savings_ratio); } grpc_slice_buffer_swap(&calld->slices, &tmp); - calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; + send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } else { if (GRPC_TRACER_ON(grpc_compression_trace)) { char *algo_name; @@ -207,83 +205,118 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, algo_name, calld->slices.length); } } - grpc_slice_buffer_destroy_internal(exec_ctx, &tmp); - + // Swap out the original byte stream with our new one and send the + // batch down. + grpc_byte_stream_destroy( + exec_ctx, calld->send_message_batch->payload->send_message.send_message); grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, - calld->send_flags); - calld->send_op->payload->send_message.send_message = + send_flags); + calld->send_message_batch->payload->send_message.send_message = &calld->replacement_stream.base; - calld->post_send = calld->send_op->on_complete; - calld->send_op->on_complete = &calld->send_done; - - grpc_call_next_op(exec_ctx, elem, calld->send_op); + calld->original_send_message_on_complete = + calld->send_message_batch->on_complete; + calld->send_message_batch->on_complete = &calld->send_message_on_complete; + grpc_call_next_op(exec_ctx, elem, calld->send_message_batch); } -static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { - grpc_call_element *elem = elemp; - call_data *calld = elem->call_data; - if (GRPC_ERROR_NONE != - grpc_byte_stream_pull(exec_ctx, - calld->send_op->payload->send_message.send_message, - &calld->incoming_slice)) { - /* Should never reach here */ - abort(); - } - grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); - if (calld->send_length == calld->slices.length) { - finish_send_message(exec_ctx, elem); - } else { - continue_send_message(exec_ctx, elem); +// Pulls a slice from the send_message byte stream and adds it to calld->slices. +static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx, + call_data *calld) { + grpc_slice incoming_slice; + grpc_error *error = grpc_byte_stream_pull( + exec_ctx, calld->send_message_batch->payload->send_message.send_message, + &incoming_slice); + if (error == GRPC_ERROR_NONE) { + grpc_slice_buffer_add(&calld->slices, incoming_slice); } + return error; } -static void continue_send_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - call_data *calld = elem->call_data; +// Reads as many slices as possible from the send_message byte stream. +// If all data has been read, invokes finish_send_message(). Otherwise, +// an async call to grpc_byte_stream_next() has been started, which will +// eventually result in calling on_send_message_next_done(). +static grpc_error *continue_reading_send_message(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + call_data *calld = (call_data *)elem->call_data; while (grpc_byte_stream_next( - exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0, - &calld->got_slice)) { - grpc_byte_stream_pull(exec_ctx, - calld->send_op->payload->send_message.send_message, - &calld->incoming_slice); - grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); - if (calld->send_length == calld->slices.length) { + exec_ctx, calld->send_message_batch->payload->send_message.send_message, + ~(size_t)0, &calld->on_send_message_next_done)) { + grpc_error *error = pull_slice_from_send_message(exec_ctx, calld); + if (error != GRPC_ERROR_NONE) return error; + if (calld->slices.length == + calld->send_message_batch->payload->send_message.send_message->length) { finish_send_message(exec_ctx, elem); break; } } + return GRPC_ERROR_NONE; } -static void handle_send_message_batch(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op, - bool has_compression_algorithm) { - call_data *calld = elem->call_data; - if (!skip_compression(elem, op->payload->send_message.send_message->flags, +// Async callback for grpc_byte_stream_next(). +static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = (grpc_call_element *)arg; + call_data *calld = (call_data *)elem->call_data; + if (error != GRPC_ERROR_NONE) goto fail; + error = pull_slice_from_send_message(exec_ctx, calld); + if (error != GRPC_ERROR_NONE) goto fail; + if (calld->slices.length == + calld->send_message_batch->payload->send_message.send_message->length) { + finish_send_message(exec_ctx, elem); + } else { + // This will either finish reading all of the data and invoke + // finish_send_message(), or else it will make an async call to + // grpc_byte_stream_next(), which will eventually result in calling + // this function again. + error = continue_reading_send_message(exec_ctx, elem); + if (error != GRPC_ERROR_NONE) goto fail; + } + return; +fail: + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); +} + +static void start_send_message_batch(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op_batch *batch, + bool has_compression_algorithm) { + call_data *calld = (call_data *)elem->call_data; + if (!skip_compression(elem, batch->payload->send_message.send_message->flags, has_compression_algorithm)) { - calld->send_op = op; - calld->send_length = op->payload->send_message.send_message->length; - calld->send_flags = op->payload->send_message.send_message->flags; - continue_send_message(exec_ctx, elem); + calld->send_message_batch = batch; + // This will either finish reading all of the data and invoke + // finish_send_message(), or else it will make an async call to + // grpc_byte_stream_next(), which will eventually result in calling + // on_send_message_next_done(). + grpc_error *error = continue_reading_send_message(exec_ctx, elem); + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); + } } else { /* pass control down the stack */ - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, batch); } } static void compress_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { + grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0); - if (op->cancel_stream) { - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error); + if (batch->cancel_stream) { + // TODO(roth): As part of the upcoming call combiner work, change + // this to call grpc_byte_stream_shutdown() on the incoming byte + // stream, to cancel any in-flight calls to grpc_byte_stream_next(). + GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); gpr_atm cur = gpr_atm_full_xchg( &calld->send_initial_metadata_state, - CANCELLED_BIT | (gpr_atm)op->payload->cancel_stream.cancel_error); + CANCELLED_BIT | (gpr_atm)batch->payload->cancel_stream.cancel_error); switch (cur) { case HAS_COMPRESSION_ALGORITHM: case NO_COMPRESSION_ALGORITHM: @@ -293,7 +326,7 @@ static void compress_start_transport_stream_op_batch( if ((cur & CANCELLED_BIT) == 0) { grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, (grpc_transport_stream_op_batch *)cur, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); + GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error)); } else { GRPC_ERROR_UNREF((grpc_error *)(cur & ~CANCELLED_BIT)); } @@ -301,14 +334,15 @@ static void compress_start_transport_stream_op_batch( } } - if (op->send_initial_metadata) { + if (batch->send_initial_metadata) { bool has_compression_algorithm; grpc_error *error = process_send_initial_metadata( exec_ctx, elem, - op->payload->send_initial_metadata.send_initial_metadata, + batch->payload->send_initial_metadata.send_initial_metadata, &has_compression_algorithm); if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, + error); return; } gpr_atm cur; @@ -324,32 +358,32 @@ static void compress_start_transport_stream_op_batch( goto retry_send_im; } if (cur != INITIAL_METADATA_UNSEEN) { - handle_send_message_batch(exec_ctx, elem, - (grpc_transport_stream_op_batch *)cur, - has_compression_algorithm); + start_send_message_batch(exec_ctx, elem, + (grpc_transport_stream_op_batch *)cur, + has_compression_algorithm); } } } - if (op->send_message) { + if (batch->send_message) { gpr_atm cur; retry_send: cur = gpr_atm_acq_load(&calld->send_initial_metadata_state); switch (cur) { case INITIAL_METADATA_UNSEEN: if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur, - (gpr_atm)op)) { + (gpr_atm)batch)) { goto retry_send; } break; case HAS_COMPRESSION_ALGORITHM: case NO_COMPRESSION_ALGORITHM: - handle_send_message_batch(exec_ctx, elem, op, - cur == HAS_COMPRESSION_ALGORITHM); + start_send_message_batch(exec_ctx, elem, batch, + cur == HAS_COMPRESSION_ALGORITHM); break; default: if (cur & CANCELLED_BIT) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, + exec_ctx, batch, GRPC_ERROR_REF((grpc_error *)(cur & ~CANCELLED_BIT))); } else { /* >1 send_message concurrently */ @@ -358,7 +392,7 @@ static void compress_start_transport_stream_op_batch( } } else { /* pass control down the stack */ - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, batch); } GPR_TIMER_END("compress_start_transport_stream_op_batch", 0); @@ -373,10 +407,10 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* initialize members */ grpc_slice_buffer_init(&calld->slices); - GRPC_CLOSURE_INIT(&calld->got_slice, got_slice, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->send_done, send_done, elem, - grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->on_send_message_next_done, + on_send_message_next_done, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete, + elem, grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 731ebf400f..dc35f4855c 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -730,6 +730,14 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, grpc_slice_buffer_destroy_internal(exec_ctx, &s->unprocessed_incoming_frames_buffer); grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage); + if (s->compressed_data_buffer) { + grpc_slice_buffer_destroy_internal(exec_ctx, s->compressed_data_buffer); + gpr_free(s->compressed_data_buffer); + } + if (s->decompressed_data_buffer) { + grpc_slice_buffer_destroy_internal(exec_ctx, s->decompressed_data_buffer); + gpr_free(s->decompressed_data_buffer); + } grpc_chttp2_list_remove_stalled_by_transport(t, s); grpc_chttp2_list_remove_stalled_by_stream(t, s); @@ -780,6 +788,15 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; + if (s->stream_compression_ctx != NULL) { + grpc_stream_compression_context_destroy(s->stream_compression_ctx); + s->stream_compression_ctx = NULL; + } + if (s->stream_decompression_ctx != NULL) { + grpc_stream_compression_context_destroy(s->stream_decompression_ctx); + s->stream_decompression_ctx = NULL; + } + s->destroy_stream_arg = then_schedule_closure; GRPC_CLOSURE_SCHED( exec_ctx, GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, @@ -1173,6 +1190,7 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, return; /* early out */ } if (s->fetched_send_message_length == s->fetching_send_message->length) { + grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); int64_t notify_offset = s->next_message_end_offset; if (notify_offset <= s->flow_controlled_bytes_written) { grpc_chttp2_complete_closure_step( @@ -1195,9 +1213,14 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, return; /* early out */ } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, UINT32_MAX, &s->complete_fetch_locked)) { - grpc_byte_stream_pull(exec_ctx, s->fetching_send_message, - &s->fetching_slice); - add_fetched_slice_locked(exec_ctx, t, s); + grpc_error *error = grpc_byte_stream_pull( + exec_ctx, s->fetching_send_message, &s->fetching_slice); + if (error != GRPC_ERROR_NONE) { + grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + } else { + add_fetched_slice_locked(exec_ctx, t, s); + } } } } @@ -1214,10 +1237,9 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, continue_fetching_send_locked(exec_ctx, t, s); } } - if (error != GRPC_ERROR_NONE) { - /* TODO(ctiller): what to do here */ - abort(); + grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); } } @@ -1362,8 +1384,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, "fetching_send_message_finished"); } else { GPR_ASSERT(s->fetching_send_message == NULL); - uint8_t *frame_hdr = - grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5); + uint8_t *frame_hdr = grpc_slice_buffer_tiny_add( + &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES); uint32_t flags = op_payload->send_message.send_message->flags; frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0; size_t len = op_payload->send_message.send_message->length; @@ -1454,14 +1476,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, s->recv_message_ready = op_payload->recv_message.recv_message_ready; s->recv_message = op_payload->recv_message.recv_message; if (s->id != 0) { - if (s->pending_byte_stream) { - already_received = s->frame_storage.length; - } else { - already_received = s->frame_storage.length + - s->unprocessed_incoming_frames_buffer.length; - } - incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, - already_received); + already_received = s->frame_storage.length; + incoming_byte_stream_update_flow_control( + exec_ctx, t, s, GRPC_HEADER_SIZE_IN_BYTES, already_received); } grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); } @@ -1698,10 +1715,43 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, if (s->unprocessed_incoming_frames_buffer.length == 0) { grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer, &s->frame_storage); + s->unprocessed_incoming_frames_decompressed = false; + } + if (s->stream_compression_recv_enabled && + !s->unprocessed_incoming_frames_decompressed) { + GPR_ASSERT(s->decompressed_data_buffer->length == 0); + bool end_of_context; + if (!s->stream_decompression_ctx) { + s->stream_decompression_ctx = + grpc_stream_compression_context_create( + GRPC_STREAM_COMPRESSION_DECOMPRESS); + } + if (!grpc_stream_decompress(s->stream_decompression_ctx, + &s->unprocessed_incoming_frames_buffer, + s->decompressed_data_buffer, NULL, + GRPC_HEADER_SIZE_IN_BYTES, + &end_of_context)) { + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + &s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal( + exec_ctx, &s->unprocessed_incoming_frames_buffer); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Stream decompression error."); + } else { + error = grpc_deframe_unprocessed_incoming_frames( + exec_ctx, &s->data_parser, s, s->decompressed_data_buffer, NULL, + s->recv_message); + if (end_of_context) { + grpc_stream_compression_context_destroy( + s->stream_decompression_ctx); + s->stream_decompression_ctx = NULL; + } + } + } else { + error = grpc_deframe_unprocessed_incoming_frames( + exec_ctx, &s->data_parser, s, + &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message); } - error = grpc_deframe_unprocessed_incoming_frames( - exec_ctx, &s->data_parser, s, - &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message); if (error != GRPC_ERROR_NONE) { s->seen_error = true; grpc_slice_buffer_reset_and_unref_internal(exec_ctx, @@ -1739,7 +1789,37 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, } bool pending_data = s->pending_byte_stream || s->unprocessed_incoming_frames_buffer.length > 0; + if (s->stream_compression_recv_enabled && s->read_closed && + s->frame_storage.length > 0 && + s->unprocessed_incoming_frames_buffer.length == 0 && !pending_data && + !s->seen_error && s->recv_trailing_metadata_finished != NULL) { + /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and + * maybe decompress the next 5 bytes in the stream. */ + bool end_of_context; + if (!s->stream_decompression_ctx) { + s->stream_decompression_ctx = grpc_stream_compression_context_create( + GRPC_STREAM_COMPRESSION_DECOMPRESS); + } + if (!grpc_stream_decompress(s->stream_decompression_ctx, + &s->frame_storage, + &s->unprocessed_incoming_frames_buffer, NULL, + GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) { + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal( + exec_ctx, &s->unprocessed_incoming_frames_buffer); + s->seen_error = true; + } else { + if (s->unprocessed_incoming_frames_buffer.length > 0) { + s->unprocessed_incoming_frames_decompressed = true; + } + if (end_of_context) { + grpc_stream_compression_context_destroy(s->stream_decompression_ctx); + s->stream_decompression_ctx = NULL; + } + } + } if (s->read_closed && s->frame_storage.length == 0 && + s->unprocessed_incoming_frames_buffer.length == 0 && (!pending_data || s->seen_error) && s->recv_trailing_metadata_finished != NULL) { grpc_chttp2_incoming_metadata_buffer_publish( @@ -2607,6 +2687,7 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, if (s->frame_storage.length > 0) { grpc_slice_buffer_swap(&s->frame_storage, &s->unprocessed_incoming_frames_buffer); + s->unprocessed_incoming_frames_decompressed = false; GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE); } else if (s->byte_stream_error != GRPC_ERROR_NONE) { GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete, @@ -2668,17 +2749,41 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; grpc_chttp2_stream *s = bs->stream; + grpc_error *error; if (s->unprocessed_incoming_frames_buffer.length > 0) { - grpc_error *error = grpc_deframe_unprocessed_incoming_frames( + if (s->stream_compression_recv_enabled && + !s->unprocessed_incoming_frames_decompressed) { + bool end_of_context; + if (!s->stream_decompression_ctx) { + s->stream_decompression_ctx = grpc_stream_compression_context_create( + GRPC_STREAM_COMPRESSION_DECOMPRESS); + } + if (!grpc_stream_decompress(s->stream_decompression_ctx, + &s->unprocessed_incoming_frames_buffer, + s->decompressed_data_buffer, NULL, MAX_SIZE_T, + &end_of_context)) { + error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error."); + return error; + } + GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); + grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer, + s->decompressed_data_buffer); + s->unprocessed_incoming_frames_decompressed = true; + if (end_of_context) { + grpc_stream_compression_context_destroy(s->stream_decompression_ctx); + s->stream_decompression_ctx = NULL; + } + } + error = grpc_deframe_unprocessed_incoming_frames( exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, slice, NULL); if (error != GRPC_ERROR_NONE) { return error; } } else { - grpc_error *error = - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error)); return error; } @@ -2686,22 +2791,9 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } -static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream); - static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, void *byte_stream, - grpc_error *error_ignored) { - grpc_chttp2_incoming_byte_stream *bs = byte_stream; - grpc_chttp2_stream *s = bs->stream; - grpc_chttp2_transport *t = s->t; - - GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy); - incoming_byte_stream_unref(exec_ctx, bs); - s->pending_byte_stream = false; - grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); - grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); -} + grpc_error *error_ignored); static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) { @@ -2768,6 +2860,33 @@ grpc_error *grpc_chttp2_incoming_byte_stream_finished( return error; } +static void incoming_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_error *error) { + grpc_chttp2_incoming_byte_stream *bs = + (grpc_chttp2_incoming_byte_stream *)byte_stream; + GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( + exec_ctx, bs, error, true /* reset_on_error */)); +} + +static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = { + incoming_byte_stream_next, incoming_byte_stream_pull, + incoming_byte_stream_shutdown, incoming_byte_stream_destroy}; + +static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, + void *byte_stream, + grpc_error *error_ignored) { + grpc_chttp2_incoming_byte_stream *bs = byte_stream; + grpc_chttp2_stream *s = bs->stream; + grpc_chttp2_transport *t = s->t; + + GPR_ASSERT(bs->base.vtable == &grpc_chttp2_incoming_byte_stream_vtable); + incoming_byte_stream_unref(exec_ctx, bs); + s->pending_byte_stream = false; + grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); +} + grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, uint32_t frame_size, uint32_t flags) { @@ -2776,9 +2895,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( incoming_byte_stream->base.length = frame_size; incoming_byte_stream->remaining_bytes = frame_size; incoming_byte_stream->base.flags = flags; - incoming_byte_stream->base.next = incoming_byte_stream_next; - incoming_byte_stream->base.pull = incoming_byte_stream_pull; - incoming_byte_stream->base.destroy = incoming_byte_stream_destroy; + incoming_byte_stream->base.vtable = &grpc_chttp2_incoming_byte_stream_vtable; gpr_ref_init(&incoming_byte_stream->refs, 2); incoming_byte_stream->transport = t; incoming_byte_stream->stream = s; diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c index dead6be77f..222d2177b2 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.c +++ b/src/core/ext/transport/chttp2/transport/frame_data.c @@ -293,7 +293,6 @@ grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_slice slice, int is_last) { - /* grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); */ if (!s->pending_byte_stream) { grpc_slice_ref_internal(slice); grpc_slice_buffer_add(&s->frame_storage, slice); @@ -304,6 +303,7 @@ grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice); GRPC_CLOSURE_SCHED(exec_ctx, s->on_next, GRPC_ERROR_NONE); s->on_next = NULL; + s->unprocessed_incoming_frames_decompressed = false; } else { grpc_slice_ref_internal(slice); grpc_slice_buffer_add(&s->frame_storage, slice); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 4563b78e75..b538d1df17 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -526,6 +526,26 @@ struct grpc_chttp2_stream { grpc_chttp2_write_cb *on_write_finished_cbs; grpc_chttp2_write_cb *finish_after_write; size_t sending_bytes; + + /** Whether stream compression send is enabled */ + bool stream_compression_recv_enabled; + /** Whether stream compression recv is enabled */ + bool stream_compression_send_enabled; + /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed + */ + bool unprocessed_incoming_frames_decompressed; + /** Stream compression decompress context */ + grpc_stream_compression_context *stream_decompression_ctx; + /** Stream compression compress context */ + grpc_stream_compression_context *stream_compression_ctx; + + /** Buffer storing data that is compressed but not sent */ + grpc_slice_buffer *compressed_data_buffer; + /** Amount of uncompressed bytes sent out when compressed_data_buffer is + * emptied */ + size_t uncompressed_data_size; + /** Temporary buffer storing decompressed data */ + grpc_slice_buffer *decompressed_data_buffer; }; /** Transport writing call flow: @@ -621,6 +641,9 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, grpc_closure **pclosure, grpc_error *error, const char *desc); +#define GRPC_HEADER_SIZE_IN_BYTES 5 +#define MAX_SIZE_T (~(size_t)0) + #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \ (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1) diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 315f2a67a2..c3ede08343 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -303,7 +303,9 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } if (sent_initial_metadata) { /* send any body bytes, if allowed by flow control */ - if (s->flow_controlled_buffer.length > 0) { + if (s->flow_controlled_buffer.length > 0 || + (s->stream_compression_send_enabled && + s->compressed_data_buffer->length > 0)) { uint32_t stream_outgoing_window = (uint32_t)GPR_MAX( 0, s->outgoing_window_delta + @@ -314,21 +316,63 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], GPR_MIN(stream_outgoing_window, t->outgoing_window)); if (max_outgoing > 0) { - uint32_t send_bytes = - (uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length); - bool is_last_data_frame = - s->fetching_send_message == NULL && - send_bytes == s->flow_controlled_buffer.length; - bool is_last_frame = - is_last_data_frame && s->send_trailing_metadata != NULL && - grpc_metadata_batch_is_empty(s->send_trailing_metadata); - grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes, - is_last_frame, &s->stats.outgoing, - &t->outbuf); - GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta, - send_bytes); - GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window, - send_bytes); + bool is_last_data_frame = false; + bool is_last_frame = false; + if (s->stream_compression_send_enabled) { + while ((s->flow_controlled_buffer.length > 0 || + s->compressed_data_buffer->length > 0) && + max_outgoing > 0) { + if (s->compressed_data_buffer->length > 0) { + uint32_t send_bytes = (uint32_t)GPR_MIN( + max_outgoing, s->compressed_data_buffer->length); + is_last_data_frame = + (send_bytes == s->compressed_data_buffer->length && + s->flow_controlled_buffer.length == 0 && + s->fetching_send_message == NULL); + is_last_frame = + is_last_data_frame && s->send_trailing_metadata != NULL && + grpc_metadata_batch_is_empty(s->send_trailing_metadata); + grpc_chttp2_encode_data(s->id, s->compressed_data_buffer, + send_bytes, is_last_frame, + &s->stats.outgoing, &t->outbuf); + GRPC_CHTTP2_FLOW_DEBIT_STREAM( + "write", t, s, outgoing_window_delta, send_bytes); + GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window, + send_bytes); + max_outgoing -= send_bytes; + if (s->compressed_data_buffer->length == 0) { + s->sending_bytes += s->uncompressed_data_size; + } + } else { + if (s->stream_compression_ctx == NULL) { + s->stream_compression_ctx = + grpc_stream_compression_context_create( + GRPC_STREAM_COMPRESSION_COMPRESS); + } + s->uncompressed_data_size = s->flow_controlled_buffer.length; + GPR_ASSERT(grpc_stream_compress( + s->stream_compression_ctx, &s->flow_controlled_buffer, + s->compressed_data_buffer, NULL, MAX_SIZE_T, + GRPC_STREAM_COMPRESSION_FLUSH_SYNC)); + } + } + } else { + uint32_t send_bytes = (uint32_t)GPR_MIN( + max_outgoing, s->flow_controlled_buffer.length); + is_last_data_frame = s->fetching_send_message == NULL && + send_bytes == s->flow_controlled_buffer.length; + is_last_frame = + is_last_data_frame && s->send_trailing_metadata != NULL && + grpc_metadata_batch_is_empty(s->send_trailing_metadata); + grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, + send_bytes, is_last_frame, + &s->stats.outgoing, &t->outbuf); + GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta, + send_bytes); + GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window, + send_bytes); + s->sending_bytes += send_bytes; + } t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; if (!t->is_client) { @@ -345,9 +389,10 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( &s->stats.outgoing)); } } - s->sending_bytes += send_bytes; now_writing = true; - if (s->flow_controlled_buffer.length > 0) { + if (s->flow_controlled_buffer.length > 0 || + (s->stream_compression_send_enabled && + s->compressed_data_buffer->length > 0)) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork"); grpc_chttp2_list_add_writable_stream(t, s); } @@ -361,7 +406,9 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } if (s->send_trailing_metadata != NULL && s->fetching_send_message == NULL && - s->flow_controlled_buffer.length == 0) { + s->flow_controlled_buffer.length == 0 && + (!s->stream_compression_send_enabled || + s->compressed_data_buffer->length == 0)) { GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata")); if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) { grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true, diff --git a/src/core/ext/transport/inproc/inproc_transport.c b/src/core/ext/transport/inproc/inproc_transport.c index 14498021eb..6f4b429ee2 100644 --- a/src/core/ext/transport/inproc/inproc_transport.c +++ b/src/core/ext/transport/inproc/inproc_transport.c @@ -72,6 +72,7 @@ typedef struct sb_list_entry { typedef struct { grpc_byte_stream base; sb_list_entry *le; + grpc_error *shutdown_error; } inproc_slice_byte_stream; typedef struct { @@ -201,24 +202,39 @@ static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx, grpc_byte_stream *bs, grpc_slice *slice) { inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; + if (stream->shutdown_error != GRPC_ERROR_NONE) { + return GRPC_ERROR_REF(stream->shutdown_error); + } *slice = grpc_slice_buffer_take_first(&stream->le->sb); return GRPC_ERROR_NONE; } +static void inproc_slice_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *bs, + grpc_error *error) { + inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; + GRPC_ERROR_UNREF(stream->shutdown_error); + stream->shutdown_error = error; +} + static void inproc_slice_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *bs) { inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; sb_list_entry_destroy(exec_ctx, stream->le); + GRPC_ERROR_UNREF(stream->shutdown_error); } +static const grpc_byte_stream_vtable inproc_slice_byte_stream_vtable = { + inproc_slice_byte_stream_next, inproc_slice_byte_stream_pull, + inproc_slice_byte_stream_shutdown, inproc_slice_byte_stream_destroy}; + void inproc_slice_byte_stream_init(inproc_slice_byte_stream *s, sb_list_entry *le) { s->base.length = (uint32_t)le->sb.length; s->base.flags = 0; - s->base.next = inproc_slice_byte_stream_next; - s->base.pull = inproc_slice_byte_stream_pull; - s->base.destroy = inproc_slice_byte_stream_destroy; + s->base.vtable = &inproc_slice_byte_stream_vtable; s->le = le; + s->shutdown_error = GRPC_ERROR_NONE; } static void ref_transport(inproc_transport *t) { @@ -956,11 +972,18 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GPR_ASSERT(grpc_byte_stream_next(exec_ctx, op->payload->send_message.send_message, SIZE_MAX, &unused)); - grpc_byte_stream_pull(exec_ctx, op->payload->send_message.send_message, - &message_slice); + error = grpc_byte_stream_pull( + exec_ctx, op->payload->send_message.send_message, &message_slice); + if (error != GRPC_ERROR_NONE) { + cancel_stream_locked(exec_ctx, s, GRPC_ERROR_REF(error)); + break; + } + GPR_ASSERT(error == GRPC_ERROR_NONE); remaining -= GRPC_SLICE_LENGTH(message_slice); grpc_slice_buffer_add(dest, message_slice); } while (remaining != 0); + grpc_byte_stream_destroy(exec_ctx, + op->payload->send_message.send_message); } if (error == GRPC_ERROR_NONE && op->send_trailing_metadata) { grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md diff --git a/src/core/lib/iomgr/nameser.h b/src/core/lib/iomgr/nameser.h new file mode 100644 index 0000000000..daed6de518 --- /dev/null +++ b/src/core/lib/iomgr/nameser.h @@ -0,0 +1,104 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_NAMESER_H +#define GRPC_CORE_LIB_IOMGR_NAMESER_H + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_HAVE_ARPA_NAMESER + +#include <arpa/nameser.h> + +#else /* GRPC_HAVE_ARPA_NAMESER */ + +typedef enum __ns_class { + ns_c_invalid = 0, /* Cookie. */ + ns_c_in = 1, /* Internet. */ + ns_c_2 = 2, /* unallocated/unsupported. */ + ns_c_chaos = 3, /* MIT Chaos-net. */ + ns_c_hs = 4, /* MIT Hesiod. */ + /* Query class values which do not appear in resource records */ + ns_c_none = 254, /* for prereq. sections in update requests */ + ns_c_any = 255, /* Wildcard match. */ + ns_c_max = 65536 +} ns_class; + +typedef enum __ns_type { + ns_t_invalid = 0, /* Cookie. */ + ns_t_a = 1, /* Host address. */ + ns_t_ns = 2, /* Authoritative server. */ + ns_t_md = 3, /* Mail destination. */ + ns_t_mf = 4, /* Mail forwarder. */ + ns_t_cname = 5, /* Canonical name. */ + ns_t_soa = 6, /* Start of authority zone. */ + ns_t_mb = 7, /* Mailbox domain name. */ + ns_t_mg = 8, /* Mail group member. */ + ns_t_mr = 9, /* Mail rename name. */ + ns_t_null = 10, /* Null resource record. */ + ns_t_wks = 11, /* Well known service. */ + ns_t_ptr = 12, /* Domain name pointer. */ + ns_t_hinfo = 13, /* Host information. */ + ns_t_minfo = 14, /* Mailbox information. */ + ns_t_mx = 15, /* Mail routing information. */ + ns_t_txt = 16, /* Text strings. */ + ns_t_rp = 17, /* Responsible person. */ + ns_t_afsdb = 18, /* AFS cell database. */ + ns_t_x25 = 19, /* X_25 calling address. */ + ns_t_isdn = 20, /* ISDN calling address. */ + ns_t_rt = 21, /* Router. */ + ns_t_nsap = 22, /* NSAP address. */ + ns_t_nsap_ptr = 23, /* Reverse NSAP lookup (deprecated). */ + ns_t_sig = 24, /* Security signature. */ + ns_t_key = 25, /* Security key. */ + ns_t_px = 26, /* X.400 mail mapping. */ + ns_t_gpos = 27, /* Geographical position (withdrawn). */ + ns_t_aaaa = 28, /* Ip6 Address. */ + ns_t_loc = 29, /* Location Information. */ + ns_t_nxt = 30, /* Next domain (security). */ + ns_t_eid = 31, /* Endpoint identifier. */ + ns_t_nimloc = 32, /* Nimrod Locator. */ + ns_t_srv = 33, /* Server Selection. */ + ns_t_atma = 34, /* ATM Address */ + ns_t_naptr = 35, /* Naming Authority PoinTeR */ + ns_t_kx = 36, /* Key Exchange */ + ns_t_cert = 37, /* Certification record */ + ns_t_a6 = 38, /* IPv6 address (deprecates AAAA) */ + ns_t_dname = 39, /* Non-terminal DNAME (for IPv6) */ + ns_t_sink = 40, /* Kitchen sink (experimentatl) */ + ns_t_opt = 41, /* EDNS0 option (meta-RR) */ + ns_t_apl = 42, /* Address prefix list (RFC3123) */ + ns_t_ds = 43, /* Delegation Signer (RFC4034) */ + ns_t_sshfp = 44, /* SSH Key Fingerprint (RFC4255) */ + ns_t_rrsig = 46, /* Resource Record Signature (RFC4034) */ + ns_t_nsec = 47, /* Next Secure (RFC4034) */ + ns_t_dnskey = 48, /* DNS Public Key (RFC4034) */ + ns_t_tkey = 249, /* Transaction key */ + ns_t_tsig = 250, /* Transaction signature. */ + ns_t_ixfr = 251, /* Incremental zone transfer. */ + ns_t_axfr = 252, /* Transfer zone of authority. */ + ns_t_mailb = 253, /* Transfer mailbox records. */ + ns_t_maila = 254, /* Transfer mail agent records. */ + ns_t_any = 255, /* Wildcard match. */ + ns_t_zxfr = 256, /* BIND-specific, nonstandard. */ + ns_t_max = 65536 +} ns_type; + +#endif /* GRPC_HAVE_ARPA_NAMESER */ + +#endif /* GRPC_CORE_LIB_IOMGR_NAMESER_H */ diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index f5d15b4850..c12058f890 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -24,6 +24,7 @@ #if defined(GRPC_UV) // Do nothing #elif defined(GPR_MANYLINUX1) +#define GRPC_HAVE_ARPA_NAMESER 1 #define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_IP_PKTINFO 1 @@ -51,6 +52,7 @@ #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 #elif defined(GPR_LINUX) +#define GRPC_HAVE_ARPA_NAMESER 1 #define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_IP_PKTINFO 1 @@ -82,6 +84,7 @@ #define GRPC_POSIX_SOCKETUTILS #endif #elif defined(GPR_APPLE) +#define GRPC_HAVE_ARPA_NAMESER 1 #define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_SO_NOSIGPIPE 1 #define GRPC_HAVE_UNIX_SOCKET 1 @@ -93,6 +96,7 @@ #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 #elif defined(GPR_FREEBSD) +#define GRPC_HAVE_ARPA_NAMESER 1 #define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_SO_NOSIGPIPE 1 @@ -104,6 +108,7 @@ #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 #elif defined(GPR_NACL) +#define GRPC_HAVE_ARPA_NAMESER 1 #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 #define GRPC_POSIX_SOCKET 1 #define GRPC_POSIX_SOCKETADDR 1 diff --git a/src/core/lib/support/avl.c b/src/core/lib/support/avl.c index a6178fdbce..0e28b24c98 100644 --- a/src/core/lib/support/avl.c +++ b/src/core/lib/support/avl.c @@ -39,15 +39,16 @@ static gpr_avl_node *ref_node(gpr_avl_node *node) { return node; } -static void unref_node(const gpr_avl_vtable *vtable, gpr_avl_node *node) { +static void unref_node(const gpr_avl_vtable *vtable, gpr_avl_node *node, + void *user_data) { if (node == NULL) { return; } if (gpr_unref(&node->refs)) { - vtable->destroy_key(node->key); - vtable->destroy_value(node->value); - unref_node(vtable, node->left); - unref_node(vtable, node->right); + vtable->destroy_key(node->key, user_data); + vtable->destroy_value(node->value, user_data); + unref_node(vtable, node->left, user_data); + unref_node(vtable, node->right, user_data); gpr_free(node); } } @@ -87,30 +88,30 @@ gpr_avl_node *new_node(void *key, void *value, gpr_avl_node *left, } static gpr_avl_node *get(const gpr_avl_vtable *vtable, gpr_avl_node *node, - void *key) { + void *key, void *user_data) { long cmp; if (node == NULL) { return NULL; } - cmp = vtable->compare_keys(node->key, key); + cmp = vtable->compare_keys(node->key, key, user_data); if (cmp == 0) { return node; } else if (cmp > 0) { - return get(vtable, node->left, key); + return get(vtable, node->left, key, user_data); } else { - return get(vtable, node->right, key); + return get(vtable, node->right, key, user_data); } } -void *gpr_avl_get(gpr_avl avl, void *key) { - gpr_avl_node *node = get(avl.vtable, avl.root, key); +void *gpr_avl_get(gpr_avl avl, void *key, void *user_data) { + gpr_avl_node *node = get(avl.vtable, avl.root, key, user_data); return node ? node->value : NULL; } -int gpr_avl_maybe_get(gpr_avl avl, void *key, void **value) { - gpr_avl_node *node = get(avl.vtable, avl.root, key); +int gpr_avl_maybe_get(gpr_avl avl, void *key, void **value, void *user_data) { + gpr_avl_node *node = get(avl.vtable, avl.root, key, user_data); if (node != NULL) { *value = node->value; return 1; @@ -120,70 +121,75 @@ int gpr_avl_maybe_get(gpr_avl avl, void *key, void **value) { static gpr_avl_node *rotate_left(const gpr_avl_vtable *vtable, void *key, void *value, gpr_avl_node *left, - gpr_avl_node *right) { - gpr_avl_node *n = - new_node(vtable->copy_key(right->key), vtable->copy_value(right->value), - new_node(key, value, left, ref_node(right->left)), - ref_node(right->right)); - unref_node(vtable, right); + gpr_avl_node *right, void *user_data) { + gpr_avl_node *n = new_node(vtable->copy_key(right->key, user_data), + vtable->copy_value(right->value, user_data), + new_node(key, value, left, ref_node(right->left)), + ref_node(right->right)); + unref_node(vtable, right, user_data); return n; } static gpr_avl_node *rotate_right(const gpr_avl_vtable *vtable, void *key, void *value, gpr_avl_node *left, - gpr_avl_node *right) { - gpr_avl_node *n = new_node( - vtable->copy_key(left->key), vtable->copy_value(left->value), - ref_node(left->left), new_node(key, value, ref_node(left->right), right)); - unref_node(vtable, left); + gpr_avl_node *right, void *user_data) { + gpr_avl_node *n = + new_node(vtable->copy_key(left->key, user_data), + vtable->copy_value(left->value, user_data), ref_node(left->left), + new_node(key, value, ref_node(left->right), right)); + unref_node(vtable, left, user_data); return n; } static gpr_avl_node *rotate_left_right(const gpr_avl_vtable *vtable, void *key, void *value, gpr_avl_node *left, - gpr_avl_node *right) { + gpr_avl_node *right, void *user_data) { /* rotate_right(..., rotate_left(left), right) */ - gpr_avl_node *n = new_node( - vtable->copy_key(left->right->key), - vtable->copy_value(left->right->value), - new_node(vtable->copy_key(left->key), vtable->copy_value(left->value), - ref_node(left->left), ref_node(left->right->left)), - new_node(key, value, ref_node(left->right->right), right)); - unref_node(vtable, left); + gpr_avl_node *n = + new_node(vtable->copy_key(left->right->key, user_data), + vtable->copy_value(left->right->value, user_data), + new_node(vtable->copy_key(left->key, user_data), + vtable->copy_value(left->value, user_data), + ref_node(left->left), ref_node(left->right->left)), + new_node(key, value, ref_node(left->right->right), right)); + unref_node(vtable, left, user_data); return n; } static gpr_avl_node *rotate_right_left(const gpr_avl_vtable *vtable, void *key, void *value, gpr_avl_node *left, - gpr_avl_node *right) { + gpr_avl_node *right, void *user_data) { /* rotate_left(..., left, rotate_right(right)) */ - gpr_avl_node *n = new_node( - vtable->copy_key(right->left->key), - vtable->copy_value(right->left->value), - new_node(key, value, left, ref_node(right->left->left)), - new_node(vtable->copy_key(right->key), vtable->copy_value(right->value), - ref_node(right->left->right), ref_node(right->right))); - unref_node(vtable, right); + gpr_avl_node *n = + new_node(vtable->copy_key(right->left->key, user_data), + vtable->copy_value(right->left->value, user_data), + new_node(key, value, left, ref_node(right->left->left)), + new_node(vtable->copy_key(right->key, user_data), + vtable->copy_value(right->value, user_data), + ref_node(right->left->right), ref_node(right->right))); + unref_node(vtable, right, user_data); return n; } static gpr_avl_node *rebalance(const gpr_avl_vtable *vtable, void *key, void *value, gpr_avl_node *left, - gpr_avl_node *right) { + gpr_avl_node *right, void *user_data) { switch (node_height(left) - node_height(right)) { case 2: if (node_height(left->left) - node_height(left->right) == -1) { return assert_invariants( - rotate_left_right(vtable, key, value, left, right)); + rotate_left_right(vtable, key, value, left, right, user_data)); } else { - return assert_invariants(rotate_right(vtable, key, value, left, right)); + return assert_invariants( + rotate_right(vtable, key, value, left, right, user_data)); } case -2: if (node_height(right->left) - node_height(right->right) == 1) { return assert_invariants( - rotate_right_left(vtable, key, value, left, right)); + rotate_right_left(vtable, key, value, left, right, user_data)); } else { - return assert_invariants(rotate_left(vtable, key, value, left, right)); + return assert_invariants( + rotate_left(vtable, key, value, left, right, user_data)); } default: return assert_invariants(new_node(key, value, left, right)); @@ -191,30 +197,32 @@ static gpr_avl_node *rebalance(const gpr_avl_vtable *vtable, void *key, } static gpr_avl_node *add_key(const gpr_avl_vtable *vtable, gpr_avl_node *node, - void *key, void *value) { + void *key, void *value, void *user_data) { long cmp; if (node == NULL) { return new_node(key, value, NULL, NULL); } - cmp = vtable->compare_keys(node->key, key); + cmp = vtable->compare_keys(node->key, key, user_data); if (cmp == 0) { return new_node(key, value, ref_node(node->left), ref_node(node->right)); } else if (cmp > 0) { - return rebalance( - vtable, vtable->copy_key(node->key), vtable->copy_value(node->value), - add_key(vtable, node->left, key, value), ref_node(node->right)); + return rebalance(vtable, vtable->copy_key(node->key, user_data), + vtable->copy_value(node->value, user_data), + add_key(vtable, node->left, key, value, user_data), + ref_node(node->right), user_data); } else { - return rebalance(vtable, vtable->copy_key(node->key), - vtable->copy_value(node->value), ref_node(node->left), - add_key(vtable, node->right, key, value)); + return rebalance( + vtable, vtable->copy_key(node->key, user_data), + vtable->copy_value(node->value, user_data), ref_node(node->left), + add_key(vtable, node->right, key, value, user_data), user_data); } } -gpr_avl gpr_avl_add(gpr_avl avl, void *key, void *value) { +gpr_avl gpr_avl_add(gpr_avl avl, void *key, void *value, void *user_data) { gpr_avl_node *old_root = avl.root; - avl.root = add_key(avl.vtable, avl.root, key, value); + avl.root = add_key(avl.vtable, avl.root, key, value, user_data); assert_invariants(avl.root); - unref_node(avl.vtable, old_root); + unref_node(avl.vtable, old_root, user_data); return avl; } @@ -233,12 +241,13 @@ static gpr_avl_node *in_order_tail(gpr_avl_node *node) { } static gpr_avl_node *remove_key(const gpr_avl_vtable *vtable, - gpr_avl_node *node, void *key) { + gpr_avl_node *node, void *key, + void *user_data) { long cmp; if (node == NULL) { return NULL; } - cmp = vtable->compare_keys(node->key, key); + cmp = vtable->compare_keys(node->key, key, user_data); if (cmp == 0) { if (node->left == NULL) { return ref_node(node->right); @@ -246,39 +255,45 @@ static gpr_avl_node *remove_key(const gpr_avl_vtable *vtable, return ref_node(node->left); } else if (node->left->height < node->right->height) { gpr_avl_node *h = in_order_head(node->right); - return rebalance(vtable, vtable->copy_key(h->key), - vtable->copy_value(h->value), ref_node(node->left), - remove_key(vtable, node->right, h->key)); + return rebalance( + vtable, vtable->copy_key(h->key, user_data), + vtable->copy_value(h->value, user_data), ref_node(node->left), + remove_key(vtable, node->right, h->key, user_data), user_data); } else { gpr_avl_node *h = in_order_tail(node->left); - return rebalance( - vtable, vtable->copy_key(h->key), vtable->copy_value(h->value), - remove_key(vtable, node->left, h->key), ref_node(node->right)); + return rebalance(vtable, vtable->copy_key(h->key, user_data), + vtable->copy_value(h->value, user_data), + remove_key(vtable, node->left, h->key, user_data), + ref_node(node->right), user_data); } } else if (cmp > 0) { - return rebalance( - vtable, vtable->copy_key(node->key), vtable->copy_value(node->value), - remove_key(vtable, node->left, key), ref_node(node->right)); + return rebalance(vtable, vtable->copy_key(node->key, user_data), + vtable->copy_value(node->value, user_data), + remove_key(vtable, node->left, key, user_data), + ref_node(node->right), user_data); } else { - return rebalance(vtable, vtable->copy_key(node->key), - vtable->copy_value(node->value), ref_node(node->left), - remove_key(vtable, node->right, key)); + return rebalance( + vtable, vtable->copy_key(node->key, user_data), + vtable->copy_value(node->value, user_data), ref_node(node->left), + remove_key(vtable, node->right, key, user_data), user_data); } } -gpr_avl gpr_avl_remove(gpr_avl avl, void *key) { +gpr_avl gpr_avl_remove(gpr_avl avl, void *key, void *user_data) { gpr_avl_node *old_root = avl.root; - avl.root = remove_key(avl.vtable, avl.root, key); + avl.root = remove_key(avl.vtable, avl.root, key, user_data); assert_invariants(avl.root); - unref_node(avl.vtable, old_root); + unref_node(avl.vtable, old_root, user_data); return avl; } -gpr_avl gpr_avl_ref(gpr_avl avl) { +gpr_avl gpr_avl_ref(gpr_avl avl, void *user_data) { ref_node(avl.root); return avl; } -void gpr_avl_unref(gpr_avl avl) { unref_node(avl.vtable, avl.root); } +void gpr_avl_unref(gpr_avl avl, void *user_data) { + unref_node(avl.vtable, avl.root, user_data); +} int gpr_avl_is_empty(gpr_avl avl) { return avl.root == NULL; } diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index ef8405cca8..55934964f3 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -18,6 +18,7 @@ #include <grpc/grpc.h> #include <grpc/support/alloc.h> +#include <grpc/support/log.h> #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/surface/completion_queue.h" @@ -49,7 +50,7 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, alarm->cq = cq; alarm->tag = tag; - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm, grpc_schedule_on_exec_ctx); grpc_timer_init(&exec_ctx, &alarm->alarm, diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 2365d27307..04613f17e3 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1422,7 +1422,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (nops == 0) { if (!is_notify_tag_closure) { - grpc_cq_begin_op(call->cq, notify_tag); + GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE, free_no_op_completion, NULL, gpr_malloc(sizeof(grpc_cq_completion))); @@ -1723,7 +1723,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, GRPC_CALL_INTERNAL_REF(call, "completion"); if (!is_notify_tag_closure) { - grpc_cq_begin_op(call->cq, notify_tag); + GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); } gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); @@ -1844,6 +1844,8 @@ const char *grpc_call_error_to_string(grpc_call_error error) { return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH"; case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS: return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS"; + case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN: + return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN"; case GRPC_CALL_OK: return "GRPC_CALL_OK"; } diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c index 80eb80af78..e85b308850 100644 --- a/src/core/lib/surface/channel_ping.c +++ b/src/core/lib/surface/channel_ping.c @@ -59,7 +59,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx); op->send_ping = &pr->closure; op->bind_pollset = grpc_cq_pollset(cq); - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); top_elem->filter->start_transport_op(&exec_ctx, top_elem, op); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 978d7b4171..3d82a32e82 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -196,7 +196,7 @@ typedef struct cq_vtable { void (*init)(void *data); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); void (*destroy)(void *data); - void (*begin_op)(grpc_completion_queue *cq, void *tag); + bool (*begin_op)(grpc_completion_queue *cq, void *tag); void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, @@ -288,8 +288,8 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); -static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); -static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); +static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); +static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, @@ -522,33 +522,6 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, } } -static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { - cq_next_data *cqd = DATA_FROM_CQ(cq); - GPR_ASSERT(!cqd->shutdown_called); - gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1); -} - -static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); - GPR_ASSERT(!cqd->shutdown_called); - gpr_ref(&cqd->pending_events); -} - -void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { -#ifndef NDEBUG - gpr_mu_lock(cq->mu); - if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { - cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity); - cq->outstanding_tags = - gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) * - cq->outstanding_tag_capacity); - } - cq->outstanding_tags[cq->outstanding_tag_count++] = tag; - gpr_mu_unlock(cq->mu); -#endif - cq->vtable->begin_op(cq, tag); -} - #ifndef NDEBUG static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { int found = 0; @@ -576,6 +549,41 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} #endif +static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + while (true) { + gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events); + if (count == 0) { + return false; + } else if (gpr_atm_no_barrier_cas(&cqd->pending_events, count, count + 1)) { + break; + } + } + return true; +} + +static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); + GPR_ASSERT(!cqd->shutdown_called); + gpr_ref(&cqd->pending_events); + return true; +} + +bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { +#ifndef NDEBUG + gpr_mu_lock(cq->mu); + if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { + cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity); + cq->outstanding_tags = + gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) * + cq->outstanding_tag_capacity); + } + cq->outstanding_tags[cq->outstanding_tag_count++] = tag; + gpr_mu_unlock(cq->mu); +#endif + return cq->vtable->begin_op(cq, tag); +} + /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a * completion * type of GRPC_CQ_NEXT) */ @@ -855,8 +863,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, inconsistent state. If it is the latter, we shold do a 0-timeout poll so that the thread comes back quickly from poll to make a second attempt at popping. Not doing this can potentially deadlock this - thread - forever (if the deadline is infinity) */ + thread forever (if the deadline is infinity) */ if (cq_event_queue_num_items(&cqd->queue) > 0) { iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); } @@ -869,10 +876,8 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, if (cq_event_queue_num_items(&cqd->queue) > 0) { /* Go to the beginning of the loop. No point doing a poll because (cq->shutdown == true) is only possible when there is no pending - work - (i.e cq->pending_events == 0) and any outstanding - grpc_cq_completion - events are already queued on this cq */ + work (i.e cq->pending_events == 0) and any outstanding completion + events should have already been queued on this cq */ continue; } @@ -909,11 +914,6 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, is_finished_arg.first_loop = false; } - GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next"); - grpc_exec_ctx_finish(&exec_ctx); - GPR_ASSERT(is_finished_arg.stolen_completion == NULL); - if (cq_event_queue_num_items(&cqd->queue) > 0 && gpr_atm_no_barrier_load(&cqd->pending_events) > 0) { gpr_mu_lock(cq->mu); @@ -921,6 +921,11 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, gpr_mu_unlock(cq->mu); } + GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next"); + grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(is_finished_arg.stolen_completion == NULL); + GPR_TIMER_END("grpc_completion_queue_next", 0); return ret; diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index af44482513..69d144bd95 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -72,8 +72,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc); /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made. - \a tag is currently used only in debug builds. */ -void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag); + \a tag is currently used only in debug builds. Return true on success, and + false if completion_queue has been shutdown. */ +bool grpc_cq_begin_op(grpc_completion_queue *cc, void *tag); /* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to grpc_cq_begin_op */ diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index fce7f8dca1..66dcc299aa 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1259,7 +1259,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, } /* stay locked, and gather up some stuff to do */ - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); if (server->shutdown_published) { grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, NULL, gpr_malloc(sizeof(grpc_cq_completion))); @@ -1446,7 +1446,11 @@ grpc_call_error grpc_server_request_call( error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; goto done; } - grpc_cq_begin_op(cq_for_notification, tag); + if (grpc_cq_begin_op(cq_for_notification, tag) == false) { + gpr_free(rc); + error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN; + goto done; + } details->reserved = NULL; rc->cq_idx = cq_idx; rc->type = BATCH_CALL; @@ -1496,7 +1500,11 @@ grpc_call_error grpc_server_request_registered_call( error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH; goto done; } - grpc_cq_begin_op(cq_for_notification, tag); + if (grpc_cq_begin_op(cq_for_notification, tag) == false) { + gpr_free(rc); + error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN; + goto done; + } rc->cq_idx = cq_idx; rc->type = REGISTERED_CALL; rc->server = server; diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c index 3355814017..fb03a10315 100644 --- a/src/core/lib/transport/byte_stream.c +++ b/src/core/lib/transport/byte_stream.c @@ -19,29 +19,37 @@ #include "src/core/lib/transport/byte_stream.h" #include <stdlib.h> +#include <string.h> #include <grpc/support/log.h> #include "src/core/lib/slice/slice_internal.h" -int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, size_t max_size_hint, - grpc_closure *on_complete) { - return byte_stream->next(exec_ctx, byte_stream, max_size_hint, on_complete); +bool grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, size_t max_size_hint, + grpc_closure *on_complete) { + return byte_stream->vtable->next(exec_ctx, byte_stream, max_size_hint, + on_complete); } grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, grpc_slice *slice) { - return byte_stream->pull(exec_ctx, byte_stream, slice); + return byte_stream->vtable->pull(exec_ctx, byte_stream, slice); +} + +void grpc_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_error *error) { + byte_stream->vtable->shutdown(exec_ctx, byte_stream, error); } void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) { - byte_stream->destroy(exec_ctx, byte_stream); + byte_stream->vtable->destroy(exec_ctx, byte_stream); } -/* slice_buffer_stream */ +// grpc_slice_buffer_stream static bool slice_buffer_stream_next(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, @@ -56,6 +64,9 @@ static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, grpc_slice *slice) { grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; + if (stream->shutdown_error != GRPC_ERROR_NONE) { + return GRPC_ERROR_REF(stream->shutdown_error); + } GPR_ASSERT(stream->cursor < stream->backing_buffer->count); *slice = grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]); @@ -63,8 +74,23 @@ static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } +static void slice_buffer_stream_shutdown(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_error *error) { + grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; + GRPC_ERROR_UNREF(stream->shutdown_error); + stream->shutdown_error = error; +} + static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream) {} + grpc_byte_stream *byte_stream) { + grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; + GRPC_ERROR_UNREF(stream->shutdown_error); +} + +static const grpc_byte_stream_vtable slice_buffer_stream_vtable = { + slice_buffer_stream_next, slice_buffer_stream_pull, + slice_buffer_stream_shutdown, slice_buffer_stream_destroy}; void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, grpc_slice_buffer *slice_buffer, @@ -72,9 +98,89 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, GPR_ASSERT(slice_buffer->length <= UINT32_MAX); stream->base.length = (uint32_t)slice_buffer->length; stream->base.flags = flags; - stream->base.next = slice_buffer_stream_next; - stream->base.pull = slice_buffer_stream_pull; - stream->base.destroy = slice_buffer_stream_destroy; + stream->base.vtable = &slice_buffer_stream_vtable; stream->backing_buffer = slice_buffer; stream->cursor = 0; + stream->shutdown_error = GRPC_ERROR_NONE; +} + +// grpc_caching_byte_stream + +void grpc_byte_stream_cache_init(grpc_byte_stream_cache *cache, + grpc_byte_stream *underlying_stream) { + cache->underlying_stream = underlying_stream; + grpc_slice_buffer_init(&cache->cache_buffer); +} + +void grpc_byte_stream_cache_destroy(grpc_exec_ctx *exec_ctx, + grpc_byte_stream_cache *cache) { + grpc_byte_stream_destroy(exec_ctx, cache->underlying_stream); + grpc_slice_buffer_destroy_internal(exec_ctx, &cache->cache_buffer); +} + +static bool caching_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + size_t max_size_hint, + grpc_closure *on_complete) { + grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream; + if (stream->shutdown_error != GRPC_ERROR_NONE) return true; + if (stream->cursor < stream->cache->cache_buffer.count) return true; + return grpc_byte_stream_next(exec_ctx, stream->cache->underlying_stream, + max_size_hint, on_complete); +} + +static grpc_error *caching_byte_stream_pull(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_slice *slice) { + grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream; + if (stream->shutdown_error != GRPC_ERROR_NONE) { + return GRPC_ERROR_REF(stream->shutdown_error); + } + if (stream->cursor < stream->cache->cache_buffer.count) { + *slice = grpc_slice_ref_internal( + stream->cache->cache_buffer.slices[stream->cursor]); + ++stream->cursor; + return GRPC_ERROR_NONE; + } + grpc_error *error = + grpc_byte_stream_pull(exec_ctx, stream->cache->underlying_stream, slice); + if (error == GRPC_ERROR_NONE) { + ++stream->cursor; + grpc_slice_buffer_add(&stream->cache->cache_buffer, + grpc_slice_ref_internal(*slice)); + } + return error; +} + +static void caching_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_error *error) { + grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream; + GRPC_ERROR_UNREF(stream->shutdown_error); + stream->shutdown_error = GRPC_ERROR_REF(error); + grpc_byte_stream_shutdown(exec_ctx, stream->cache->underlying_stream, error); +} + +static void caching_byte_stream_destroy(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream) { + grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream; + GRPC_ERROR_UNREF(stream->shutdown_error); +} + +static const grpc_byte_stream_vtable caching_byte_stream_vtable = { + caching_byte_stream_next, caching_byte_stream_pull, + caching_byte_stream_shutdown, caching_byte_stream_destroy}; + +void grpc_caching_byte_stream_init(grpc_caching_byte_stream *stream, + grpc_byte_stream_cache *cache) { + memset(stream, 0, sizeof(*stream)); + stream->base.length = cache->underlying_stream->length; + stream->base.flags = cache->underlying_stream->flags; + stream->base.vtable = &caching_byte_stream_vtable; + stream->cache = cache; + stream->shutdown_error = GRPC_ERROR_NONE; +} + +void grpc_caching_byte_stream_reset(grpc_caching_byte_stream *stream) { + stream->cursor = 0; } diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index f172296e4b..1e1e8310b8 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -28,52 +28,109 @@ /** Mask of all valid internal flags. */ #define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS) -struct grpc_byte_stream; typedef struct grpc_byte_stream grpc_byte_stream; -struct grpc_byte_stream { - uint32_t length; - uint32_t flags; +typedef struct { bool (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, size_t max_size_hint, grpc_closure *on_complete); grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, grpc_slice *slice); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, + grpc_error *error); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); +} grpc_byte_stream_vtable; + +struct grpc_byte_stream { + uint32_t length; + uint32_t flags; + const grpc_byte_stream_vtable *vtable; }; -/* returns 1 if the bytes are available immediately (in which case - * on_complete will not be called), 0 if the bytes will be available - * asynchronously. - * - * max_size_hint can be set as a hint as to the maximum number - * of bytes that would be acceptable to read. - */ -int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, size_t max_size_hint, - grpc_closure *on_complete); +// Returns true if the bytes are available immediately (in which case +// on_complete will not be called), false if the bytes will be available +// asynchronously. +// +// max_size_hint can be set as a hint as to the maximum number +// of bytes that would be acceptable to read. +bool grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, size_t max_size_hint, + grpc_closure *on_complete); -/* returns the next slice in the byte stream when it is ready (indicated by - * either grpc_byte_stream_next returning 1 or on_complete passed to - * grpc_byte_stream_next is called). - * - * once a slice is returned into *slice, it is owned by the caller. - */ +// Returns the next slice in the byte stream when it is ready (indicated by +// either grpc_byte_stream_next returning true or on_complete passed to +// grpc_byte_stream_next is called). +// +// Once a slice is returned into *slice, it is owned by the caller. grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, grpc_slice *slice); +// Shuts down the byte stream. +// +// If there is a pending call to on_complete from grpc_byte_stream_next(), +// it will be invoked with the error passed to grpc_byte_stream_shutdown(). +// +// The next call to grpc_byte_stream_pull() (if any) will return the error +// passed to grpc_byte_stream_shutdown(). +void grpc_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_error *error); + void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); -/* grpc_byte_stream that wraps a slice buffer */ +// grpc_slice_buffer_stream +// +// A grpc_byte_stream that wraps a slice buffer. + typedef struct grpc_slice_buffer_stream { grpc_byte_stream base; grpc_slice_buffer *backing_buffer; size_t cursor; + grpc_error *shutdown_error; } grpc_slice_buffer_stream; void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, grpc_slice_buffer *slice_buffer, uint32_t flags); +// grpc_caching_byte_stream +// +// A grpc_byte_stream that that wraps an underlying byte stream but caches +// the resulting slices in a slice buffer. If an initial attempt fails +// without fully draining the underlying stream, a new caching stream +// can be created from the same underlying cache, in which case it will +// return whatever is in the backing buffer before continuing to read the +// underlying stream. +// +// NOTE: No synchronization is done, so it is not safe to have multiple +// grpc_caching_byte_streams simultaneously drawing from the same underlying +// grpc_byte_stream_cache at the same time. + +typedef struct { + grpc_byte_stream *underlying_stream; + grpc_slice_buffer cache_buffer; +} grpc_byte_stream_cache; + +// Takes ownership of underlying_stream. +void grpc_byte_stream_cache_init(grpc_byte_stream_cache *cache, + grpc_byte_stream *underlying_stream); + +// Must not be called while still in use by a grpc_caching_byte_stream. +void grpc_byte_stream_cache_destroy(grpc_exec_ctx *exec_ctx, + grpc_byte_stream_cache *cache); + +typedef struct { + grpc_byte_stream base; + grpc_byte_stream_cache *cache; + size_t cursor; + grpc_error *shutdown_error; +} grpc_caching_byte_stream; + +void grpc_caching_byte_stream_init(grpc_caching_byte_stream *stream, + grpc_byte_stream_cache *cache); + +// Resets the byte stream to the start of the underlying stream. +void grpc_caching_byte_stream_reset(grpc_caching_byte_stream *stream); + #endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */ diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 7281602d66..6c61f4b8d9 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -207,27 +207,35 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx, return transport->vtable->get_endpoint(exec_ctx, transport); } +// This comment should be sung to the tune of +// "Supercalifragilisticexpialidocious": +// // grpc_transport_stream_op_batch_finish_with_failure // is a function that must always unref cancel_error // though it lives in lib, it handles transport stream ops sure // it's grpc_transport_stream_op_batch_finish_with_failure void grpc_transport_stream_op_batch_finish_with_failure( - grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op, + grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *batch, grpc_error *error) { - if (op->recv_message) { - GRPC_CLOSURE_SCHED(exec_ctx, op->payload->recv_message.recv_message_ready, + if (batch->send_message) { + grpc_byte_stream_destroy(exec_ctx, + batch->payload->send_message.send_message); + } + if (batch->recv_message) { + GRPC_CLOSURE_SCHED(exec_ctx, + batch->payload->recv_message.recv_message_ready, GRPC_ERROR_REF(error)); } - if (op->recv_initial_metadata) { + if (batch->recv_initial_metadata) { GRPC_CLOSURE_SCHED( exec_ctx, - op->payload->recv_initial_metadata.recv_initial_metadata_ready, + batch->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_REF(error)); } - GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, error); - if (op->cancel_stream) { - GRPC_ERROR_UNREF(op->payload->cancel_stream.cancel_error); + GRPC_CLOSURE_SCHED(exec_ctx, batch->on_complete, error); + if (batch->cancel_stream) { + GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error); } } diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 84e53e683a..099138ea14 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -159,6 +159,11 @@ struct grpc_transport_stream_op_batch_payload { } send_trailing_metadata; struct { + // The transport (or a filter that decides to return a failure before + // the op gets down to the transport) is responsible for calling + // grpc_byte_stream_destroy() on this. + // The batch's on_complete will not be called until after the byte + // stream is destroyed. grpc_byte_stream *send_message; } send_message; @@ -174,6 +179,10 @@ struct grpc_transport_stream_op_batch_payload { } recv_initial_metadata; struct { + // Will be set by the transport to point to the byte stream + // containing a received message. + // The caller is responsible for calling grpc_byte_stream_destroy() + // on this byte stream. grpc_byte_stream **recv_message; /** Should be enqueued when one message is ready to be processed. */ grpc_closure *recv_message_ready; |