diff options
Diffstat (limited to 'src/core/ext')
53 files changed, 761 insertions, 569 deletions
diff --git a/src/core/ext/census/census_log.h b/src/core/ext/census/census_log.h index 534ecc5705..1b185a53b9 100644 --- a/src/core/ext/census/census_log.h +++ b/src/core/ext/census/census_log.h @@ -84,7 +84,7 @@ const void *census_log_read_next(size_t *bytes_available); */ size_t census_log_remaining_space(void); -/* Returns the number of times gprc_stats_log_start_write() failed due to +/* Returns the number of times grpc_stats_log_start_write() failed due to out-of-space. */ int census_log_out_of_space_count(void); diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c index a4cf6f37bd..397dbc40a8 100644 --- a/src/core/ext/census/grpc_filter.c +++ b/src/core/ext/census/grpc_filter.c @@ -37,9 +37,9 @@ #include <string.h> #include <grpc/census.h> +#include <grpc/slice.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/slice.h> #include <grpc/support/time.h> #include "src/core/ext/census/census_interface.h" @@ -69,7 +69,7 @@ static void extract_and_annotate_method_tag(grpc_metadata_batch *md, for (m = md->list.head; m != NULL; m = m->next) { if (m->md->key == GRPC_MDSTR_PATH) { gpr_log(GPR_DEBUG, "%s", - (const char *)GPR_SLICE_START_PTR(m->md->value->slice)); + (const char *)GRPC_SLICE_START_PTR(m->md->value->slice)); /* Add method tag here */ } } @@ -191,6 +191,7 @@ const grpc_channel_filter grpc_client_census_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "census-client"}; const grpc_channel_filter grpc_server_census_filter = { @@ -204,4 +205,5 @@ const grpc_channel_filter grpc_server_census_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "census-server"}; diff --git a/src/core/ext/census/mlog.h b/src/core/ext/census/mlog.h index a256426f91..18805ad994 100644 --- a/src/core/ext/census/mlog.h +++ b/src/core/ext/census/mlog.h @@ -88,7 +88,7 @@ const void* census_log_read_next(size_t* bytes_available); */ size_t census_log_remaining_space(void); -/* Returns the number of times gprc_stats_log_start_write() failed due to +/* Returns the number of times grpc_stats_log_start_write() failed due to out-of-space. */ int64_t census_log_out_of_space_count(void); diff --git a/src/core/ext/census/trace_context.h b/src/core/ext/census/trace_context.h index ee71fef460..1cb5e26ea7 100644 --- a/src/core/ext/census/trace_context.h +++ b/src/core/ext/census/trace_context.h @@ -65,4 +65,4 @@ of these do not exist. On success, returns true and false otherwise. */ bool decode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer, const size_t nbytes); -#endif +#endif /* GRPC_CORE_EXT_CENSUS_TRACE_CONTEXT_H */ diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index ff773ac334..b66fed4b88 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -39,6 +39,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/useful.h> @@ -123,6 +124,7 @@ typedef struct client_channel_channel_data { /** mutex protecting all variables below in this data structure */ gpr_mu mu; /** currently active load balancer */ + char *lb_policy_name; grpc_lb_policy *lb_policy; /** maps method names to method_parameters structs */ grpc_mdstr_hash_table *method_params_table; @@ -223,6 +225,7 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { channel_data *chand = arg; + char *lb_policy_name = NULL; grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; grpc_mdstr_hash_table *method_params_table = NULL; @@ -236,7 +239,6 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, lb_policy_args.client_channel_factory = chand->client_channel_factory; // Find LB policy name. - const char *lb_policy_name = NULL; const grpc_arg *channel_arg = grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_LB_POLICY_NAME); if (channel_arg != NULL) { @@ -289,6 +291,10 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, (grpc_method_config_table *)channel_arg->value.pointer.p, method_config_convert_value, &method_parameters_vtable); } + // Before we clean up, save a copy of lb_policy_name, since it might + // be pointing to data inside chand->resolver_result. + // The copy will be saved in chand->lb_policy_name below. + lb_policy_name = gpr_strdup(lb_policy_name); grpc_channel_args_destroy(chand->resolver_result); chand->resolver_result = NULL; } @@ -299,6 +305,10 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, } gpr_mu_lock(&chand->mu); + if (lb_policy_name != NULL) { + gpr_free(chand->lb_policy_name); + chand->lb_policy_name = lb_policy_name; + } old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; if (chand->method_params_table != NULL) { @@ -426,6 +436,19 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&chand->mu); } +static void cc_get_channel_info(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + const grpc_channel_info *info) { + channel_data *chand = elem->channel_data; + gpr_mu_lock(&chand->mu); + if (info->lb_policy_name != NULL) { + *info->lb_policy_name = chand->lb_policy_name == NULL + ? NULL + : gpr_strdup(chand->lb_policy_name); + } + gpr_mu_unlock(&chand->mu); +} + /* Constructor for channel_data */ static void cc_init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, @@ -465,6 +488,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } + gpr_free(chand->lb_policy_name); if (chand->method_params_table != NULL) { grpc_mdstr_hash_table_unref(chand->method_params_table); } @@ -617,7 +641,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, grpc_subchannel_call *subchannel_call = NULL; grpc_error *new_error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, - calld->deadline, &subchannel_call); + calld->call_start_time, calld->deadline, &subchannel_call); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); subchannel_call = CANCELLED_CALL; @@ -870,7 +894,7 @@ retry: grpc_subchannel_call *subchannel_call = NULL; grpc_error *error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, - calld->deadline, &subchannel_call); + calld->call_start_time, calld->deadline, &subchannel_call); if (error != GRPC_ERROR_NONE) { subchannel_call = CANCELLED_CALL; fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); @@ -1052,6 +1076,7 @@ const grpc_channel_filter grpc_client_channel_filter = { cc_init_channel_elem, cc_destroy_channel_elem, cc_get_peer, + cc_get_channel_info, "client-channel", }; diff --git a/src/core/ext/client_channel/connector.h b/src/core/ext/client_channel/connector.h index ed7d5450de..3de061620e 100644 --- a/src/core/ext/client_channel/connector.h +++ b/src/core/ext/client_channel/connector.h @@ -52,7 +52,7 @@ typedef struct { const grpc_resolved_address *addr; size_t addr_len; /** initial connect string to send */ - gpr_slice initial_connect_string; + grpc_slice initial_connect_string; /** deadline for connection */ gpr_timespec deadline; /** channel arguments (to be passed to transport) */ diff --git a/src/core/ext/client_channel/default_initial_connect_string.c b/src/core/ext/client_channel/default_initial_connect_string.c index 0b251372fd..6db82d84ef 100644 --- a/src/core/ext/client_channel/default_initial_connect_string.c +++ b/src/core/ext/client_channel/default_initial_connect_string.c @@ -31,8 +31,8 @@ * */ -#include <grpc/support/slice.h> +#include <grpc/slice.h> #include "src/core/lib/iomgr/resolve_address.h" void grpc_set_default_initial_connect_string(grpc_resolved_address **addr, - gpr_slice *initial_str) {} + grpc_slice *initial_str) {} diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c index ea2cbbdd97..82042897b2 100644 --- a/src/core/ext/client_channel/http_connect_handshaker.c +++ b/src/core/ext/client_channel/http_connect_handshaker.c @@ -35,9 +35,9 @@ #include <string.h> +#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/slice_buffer.h> #include <grpc/support/string_util.h> #include "src/core/ext/client_channel/uri_parser.h" @@ -60,8 +60,8 @@ typedef struct http_connect_handshaker { void* user_data; // Objects for processing the HTTP CONNECT request and response. - gpr_slice_buffer write_buffer; - gpr_slice_buffer* read_buffer; // Ownership passes through this object. + grpc_slice_buffer write_buffer; + grpc_slice_buffer* read_buffer; // Ownership passes through this object. grpc_closure request_done_closure; grpc_closure response_read_closure; grpc_http_parser http_parser; @@ -76,7 +76,7 @@ static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) { if (gpr_unref(&handshaker->refcount)) { gpr_free(handshaker->proxy_server); gpr_free(handshaker->server_name); - gpr_slice_buffer_destroy(&handshaker->write_buffer); + grpc_slice_buffer_destroy(&handshaker->write_buffer); grpc_http_parser_destroy(&handshaker->http_parser); grpc_http_response_destroy(&handshaker->http_response); gpr_free(handshaker); @@ -118,7 +118,7 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, } // Add buffer to parser. for (size_t i = 0; i < handshaker->read_buffer->count; ++i) { - if (GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i]) > 0) { + if (GRPC_SLICE_LENGTH(handshaker->read_buffer->slices[i]) > 0) { size_t body_start_offset = 0; error = grpc_http_parser_parse(&handshaker->http_parser, handshaker->read_buffer->slices[i], @@ -129,20 +129,20 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_timer_cancel(exec_ctx, &handshaker->timeout_timer); // Remove the data we've already read from the read buffer, // leaving only the leftover bytes (if any). - gpr_slice_buffer tmp_buffer; - gpr_slice_buffer_init(&tmp_buffer); + grpc_slice_buffer tmp_buffer; + grpc_slice_buffer_init(&tmp_buffer); if (body_start_offset < - GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i])) { - gpr_slice_buffer_add( + GRPC_SLICE_LENGTH(handshaker->read_buffer->slices[i])) { + grpc_slice_buffer_add( &tmp_buffer, - gpr_slice_split_tail(&handshaker->read_buffer->slices[i], - body_start_offset)); + grpc_slice_split_tail(&handshaker->read_buffer->slices[i], + body_start_offset)); } - gpr_slice_buffer_addn(&tmp_buffer, - &handshaker->read_buffer->slices[i + 1], - handshaker->read_buffer->count - i - 1); - gpr_slice_buffer_swap(handshaker->read_buffer, &tmp_buffer); - gpr_slice_buffer_destroy(&tmp_buffer); + grpc_slice_buffer_addn(&tmp_buffer, + &handshaker->read_buffer->slices[i + 1], + handshaker->read_buffer->count - i - 1); + grpc_slice_buffer_swap(handshaker->read_buffer, &tmp_buffer); + grpc_slice_buffer_destroy(&tmp_buffer); break; } } @@ -159,7 +159,7 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, // complete (e.g., handling chunked transfer encoding or looking // at the Content-Length: header). if (handshaker->http_parser.state != GRPC_HTTP_BODY) { - gpr_slice_buffer_reset_and_unref(handshaker->read_buffer); + grpc_slice_buffer_reset_and_unref(handshaker->read_buffer); grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer, &handshaker->response_read_closure); return; @@ -195,7 +195,7 @@ static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx, static void http_connect_handshaker_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in, grpc_endpoint* endpoint, grpc_channel_args* args, - gpr_slice_buffer* read_buffer, gpr_timespec deadline, + grpc_slice_buffer* read_buffer, gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb, void* user_data) { http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; @@ -214,8 +214,8 @@ static void http_connect_handshaker_do_handshake( request.http.method = "CONNECT"; request.http.path = handshaker->server_name; request.handshaker = &grpc_httpcli_plaintext; - gpr_slice request_slice = grpc_httpcli_format_connect_request(&request); - gpr_slice_buffer_add(&handshaker->write_buffer, request_slice); + grpc_slice request_slice = grpc_httpcli_format_connect_request(&request); + grpc_slice_buffer_add(&handshaker->write_buffer, request_slice); grpc_endpoint_write(exec_ctx, endpoint, &handshaker->write_buffer, &handshaker->request_done_closure); // Set timeout timer. The timer gets a reference to the handshaker. @@ -239,7 +239,7 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base); handshaker->proxy_server = gpr_strdup(proxy_server); handshaker->server_name = gpr_strdup(server_name); - gpr_slice_buffer_init(&handshaker->write_buffer); + grpc_slice_buffer_init(&handshaker->write_buffer); grpc_closure_init(&handshaker->request_done_closure, on_write_done, handshaker); grpc_closure_init(&handshaker->response_read_closure, on_read_done, diff --git a/src/core/ext/client_channel/initial_connect_string.c b/src/core/ext/client_channel/initial_connect_string.c index fb1493d77d..8ebd06c458 100644 --- a/src/core/ext/client_channel/initial_connect_string.c +++ b/src/core/ext/client_channel/initial_connect_string.c @@ -36,7 +36,7 @@ #include <stddef.h> extern void grpc_set_default_initial_connect_string( - grpc_resolved_address **addr, gpr_slice *initial_str); + grpc_resolved_address **addr, grpc_slice *initial_str); static grpc_set_initial_connect_string_func g_set_initial_connect_string_func = grpc_set_default_initial_connect_string; @@ -47,6 +47,6 @@ void grpc_test_set_initial_connect_string_function( } void grpc_set_initial_connect_string(grpc_resolved_address **addr, - gpr_slice *initial_str) { + grpc_slice *initial_str) { g_set_initial_connect_string_func(addr, initial_str); } diff --git a/src/core/ext/client_channel/initial_connect_string.h b/src/core/ext/client_channel/initial_connect_string.h index 68adb0373c..876abea40e 100644 --- a/src/core/ext/client_channel/initial_connect_string.h +++ b/src/core/ext/client_channel/initial_connect_string.h @@ -34,17 +34,17 @@ #ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_INITIAL_CONNECT_STRING_H #define GRPC_CORE_EXT_CLIENT_CHANNEL_INITIAL_CONNECT_STRING_H -#include <grpc/support/slice.h> - +#include <grpc/slice.h> #include "src/core/lib/iomgr/resolve_address.h" typedef void (*grpc_set_initial_connect_string_func)( - grpc_resolved_address **addr, gpr_slice *initial_str); + grpc_resolved_address **addr, grpc_slice *initial_str); + void grpc_test_set_initial_connect_string_function( grpc_set_initial_connect_string_func func); /** Set a string to be sent once connected. Optionally reset addr. */ void grpc_set_initial_connect_string(grpc_resolved_address **addr, - gpr_slice *connect_string); + grpc_slice *connect_string); #endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_INITIAL_CONNECT_STRING_H */ diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index 789966cb69..a148b2a0e1 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -100,7 +100,7 @@ struct grpc_subchannel { grpc_subchannel_key *key; /** initial string to send to peer */ - gpr_slice initial_connect_string; + grpc_slice initial_connect_string; /** set during connection */ grpc_connect_out_args connecting_result; @@ -206,7 +206,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free((void *)c->filters); grpc_channel_args_destroy(c->args); gpr_free(c->addr); - gpr_slice_unref(c->initial_connect_string); + grpc_slice_unref(c->initial_connect_string); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); grpc_pollset_set_destroy(c->pollset_set); @@ -702,15 +702,15 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec deadline, - grpc_subchannel_call **call) { + grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec start_time, + gpr_timespec deadline, grpc_subchannel_call **call) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); (*call)->connection = con; // Ref is added below. grpc_error *error = grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call, - NULL, NULL, path, deadline, callstk); + NULL, NULL, path, start_time, deadline, callstk); if (error != GRPC_ERROR_NONE) { const char *error_string = grpc_error_string(error); gpr_log(GPR_ERROR, "error: %s", error_string); diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h index 93bd72d20d..10bae620df 100644 --- a/src/core/ext/client_channel/subchannel.h +++ b/src/core/ext/client_channel/subchannel.h @@ -111,8 +111,8 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, /** construct a subchannel call */ grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel, - grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec deadline, - grpc_subchannel_call **subchannel_call); + grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec start_time, + gpr_timespec deadline, grpc_subchannel_call **subchannel_call); /** process a transport level op */ void grpc_connected_subchannel_process_transport_op( diff --git a/src/core/ext/client_channel/uri_parser.c b/src/core/ext/client_channel/uri_parser.c index bcb6a1dee4..0fbc542ef8 100644 --- a/src/core/ext/client_channel/uri_parser.c +++ b/src/core/ext/client_channel/uri_parser.c @@ -35,13 +35,14 @@ #include <string.h> +#include <grpc/slice.h> +#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> -#include <grpc/support/slice.h> -#include <grpc/support/slice_buffer.h> #include <grpc/support/string_util.h> +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" /** a size_t default value... maps to all 1's */ @@ -148,38 +149,38 @@ static void parse_query_parts(grpc_uri *uri) { uri->num_query_parts = 0; return; } - gpr_slice query_slice = - gpr_slice_new(uri->query, strlen(uri->query), do_nothing); - gpr_slice_buffer query_parts; /* the &-separated elements of the query */ - gpr_slice_buffer query_param_parts; /* the =-separated subelements */ + grpc_slice query_slice = + grpc_slice_new(uri->query, strlen(uri->query), do_nothing); + grpc_slice_buffer query_parts; /* the &-separated elements of the query */ + grpc_slice_buffer query_param_parts; /* the =-separated subelements */ - gpr_slice_buffer_init(&query_parts); - gpr_slice_buffer_init(&query_param_parts); + grpc_slice_buffer_init(&query_parts); + grpc_slice_buffer_init(&query_param_parts); - gpr_slice_split(query_slice, QUERY_PARTS_SEPARATOR, &query_parts); + grpc_slice_split(query_slice, QUERY_PARTS_SEPARATOR, &query_parts); uri->query_parts = gpr_malloc(query_parts.count * sizeof(char *)); uri->query_parts_values = gpr_malloc(query_parts.count * sizeof(char *)); uri->num_query_parts = query_parts.count; for (size_t i = 0; i < query_parts.count; i++) { - gpr_slice_split(query_parts.slices[i], QUERY_PARTS_VALUE_SEPARATOR, - &query_param_parts); + grpc_slice_split(query_parts.slices[i], QUERY_PARTS_VALUE_SEPARATOR, + &query_param_parts); GPR_ASSERT(query_param_parts.count > 0); uri->query_parts[i] = - gpr_dump_slice(query_param_parts.slices[0], GPR_DUMP_ASCII); + grpc_dump_slice(query_param_parts.slices[0], GPR_DUMP_ASCII); if (query_param_parts.count > 1) { /* TODO(dgq): only the first value after the separator is considered. * Perhaps all chars after the first separator for the query part should * be included, even if they include the separator. */ uri->query_parts_values[i] = - gpr_dump_slice(query_param_parts.slices[1], GPR_DUMP_ASCII); + grpc_dump_slice(query_param_parts.slices[1], GPR_DUMP_ASCII); } else { uri->query_parts_values[i] = NULL; } - gpr_slice_buffer_reset_and_unref(&query_param_parts); + grpc_slice_buffer_reset_and_unref(&query_param_parts); } - gpr_slice_buffer_destroy(&query_parts); - gpr_slice_buffer_destroy(&query_param_parts); - gpr_slice_unref(query_slice); + grpc_slice_buffer_destroy(&query_parts); + grpc_slice_buffer_destroy(&query_param_parts); + grpc_slice_unref(query_slice); } grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 6249f8b80f..d8ef0c8098 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -116,6 +116,7 @@ #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/backoff.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" @@ -185,6 +186,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, * addresses failed to connect). There won't be any user_data/token * available */ if (wc_arg->target != NULL) { + GPR_ASSERT(wc_arg->lb_token != NULL); initial_metadata_add_lb_token(wc_arg->initial_metadata, wc_arg->lb_token_mdelem_storage, GRPC_MDELEM_REF(wc_arg->lb_token)); @@ -604,10 +606,10 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, * right grpclb status. */ rr_connectivity_data *rr_conn_data = arg; glb_lb_policy *glb_policy = rr_conn_data->glb_policy; + gpr_mu_lock(&glb_policy->mu); if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN && !glb_policy->shutting_down) { - gpr_mu_lock(&glb_policy->mu); /* RR not shutting down. Mimic the RR's policy state */ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_conn_data->state, GRPC_ERROR_REF(error), @@ -616,12 +618,12 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, &rr_conn_data->state, &rr_conn_data->on_change); - gpr_mu_unlock(&glb_policy->mu); } else { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "rr_connectivity_cb"); gpr_free(rr_conn_data); } + gpr_mu_unlock(&glb_policy->mu); } static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, @@ -759,17 +761,24 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (glb_policy->rr_policy) { GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); } - if (glb_policy->started_picking) { - if (glb_policy->lb_call != NULL) { - grpc_call_cancel(glb_policy->lb_call, NULL); - /* lb_on_server_status_received will pick up the cancel and clean up */ - } - } grpc_connectivity_state_set( exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE("Channel Shutdown"), "glb_shutdown"); + /* We need a copy of the lb_call pointer because we can't cancell the call + * while holding glb_policy->mu: lb_on_server_status_received, invoked due to + * the cancel, needs to acquire that same lock */ + grpc_call *lb_call = glb_policy->lb_call; + glb_policy->lb_call = NULL; gpr_mu_unlock(&glb_policy->mu); + /* glb_policy->lb_call and this local lb_call must be consistent at this point + * because glb_policy->lb_call is only assigned in lb_call_init_locked as part + * of query_for_backends_locked, which can only be invoked while + * glb_policy->shutting_down is false. */ + if (lb_call != NULL) { + grpc_call_cancel(lb_call, NULL); + /* lb_on_server_status_received will pick up the cancel and clean up */ + } while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; @@ -953,9 +962,10 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void lb_call_init(glb_lb_policy *glb_policy) { +static void lb_call_init_locked(glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->server_name != NULL); GPR_ASSERT(glb_policy->server_name[0] != '\0'); + GPR_ASSERT(!glb_policy->shutting_down); /* Note the following LB call progresses every time there's activity in \a * glb_policy->base.interested_parties, which is comprised of the polling @@ -971,10 +981,10 @@ static void lb_call_init(glb_lb_policy *glb_policy) { grpc_grpclb_request *request = grpc_grpclb_request_create(glb_policy->server_name); - gpr_slice request_payload_slice = grpc_grpclb_request_encode(request); + grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); glb_policy->lb_request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); - gpr_slice_unref(request_payload_slice); + grpc_slice_unref(request_payload_slice); grpc_grpclb_request_destroy(request); glb_policy->lb_call_status_details = NULL; @@ -1008,7 +1018,9 @@ static void lb_call_destroy_locked(glb_lb_policy *glb_policy) { static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->lb_channel != NULL); - lb_call_init(glb_policy); + if (glb_policy->shutting_down) return; + + lb_call_init_locked(glb_policy); if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)", @@ -1080,19 +1092,20 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, grpc_op ops[2]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; + gpr_mu_lock(&glb_policy->mu); if (glb_policy->lb_response_payload != NULL) { gpr_backoff_reset(&glb_policy->lb_call_backoff_state); /* Received data from the LB server. Look inside * glb_policy->lb_response_payload, for a serverlist. */ grpc_byte_buffer_reader bbr; grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload); - gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); + grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); grpc_byte_buffer_destroy(glb_policy->lb_response_payload); grpc_grpclb_serverlist *serverlist = grpc_grpclb_response_parse_serverlist(response_slice); if (serverlist != NULL) { GPR_ASSERT(glb_policy->lb_call != NULL); - gpr_slice_unref(response_slice); + grpc_slice_unref(response_slice); if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Serverlist with %lu servers received", (unsigned long)serverlist->num_servers); @@ -1108,7 +1121,6 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, /* update serverlist */ if (serverlist->num_servers > 0) { - gpr_mu_lock(&glb_policy->mu); if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, @@ -1124,7 +1136,6 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, rr_handover_locked(exec_ctx, glb_policy, error); } - gpr_mu_unlock(&glb_policy->mu); } else { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, @@ -1134,8 +1145,8 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, } } else { /* serverlist == NULL */ gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.", - gpr_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); - gpr_slice_unref(response_slice); + grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); + grpc_slice_unref(response_slice); } if (!glb_policy->shutting_down) { @@ -1152,9 +1163,11 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, &glb_policy->lb_on_response_received); /* loop */ GPR_ASSERT(GRPC_CALL_OK == call_error); } + gpr_mu_unlock(&glb_policy->mu); } else { /* empty payload: call cancelled. */ /* dispose of the "lb_on_response_received" weak ref taken in * query_for_backends_locked() and reused in every reception loop */ + gpr_mu_unlock(&glb_policy->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "lb_on_response_received_empty_payload"); } @@ -1174,7 +1187,6 @@ static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, query_for_backends_locked(exec_ctx, glb_policy); } gpr_mu_unlock(&glb_policy->mu); - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_on_retry_timer"); } diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/lb_policy/grpclb/load_balancer_api.c index a8881004a0..837e9c1113 100644 --- a/src/core/ext/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.c @@ -90,18 +90,18 @@ grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name) { return req; } -gpr_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) { +grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) { size_t encoded_length; pb_ostream_t sizestream; pb_ostream_t outputstream; - gpr_slice slice; + grpc_slice slice; memset(&sizestream, 0, sizeof(pb_ostream_t)); pb_encode(&sizestream, grpc_lb_v1_LoadBalanceRequest_fields, request); encoded_length = sizestream.bytes_written; - slice = gpr_slice_malloc(encoded_length); + slice = grpc_slice_malloc(encoded_length); outputstream = - pb_ostream_from_buffer(GPR_SLICE_START_PTR(slice), encoded_length); + pb_ostream_from_buffer(GRPC_SLICE_START_PTR(slice), encoded_length); GPR_ASSERT(pb_encode(&outputstream, grpc_lb_v1_LoadBalanceRequest_fields, request) != 0); return slice; @@ -113,10 +113,10 @@ void grpc_grpclb_request_destroy(grpc_grpclb_request *request) { typedef grpc_lb_v1_LoadBalanceResponse grpc_grpclb_response; grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( - gpr_slice encoded_grpc_grpclb_response) { + grpc_slice encoded_grpc_grpclb_response) { pb_istream_t stream = - pb_istream_from_buffer(GPR_SLICE_START_PTR(encoded_grpc_grpclb_response), - GPR_SLICE_LENGTH(encoded_grpc_grpclb_response)); + pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response), + GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response)); grpc_grpclb_response res; memset(&res, 0, sizeof(grpc_grpclb_response)); if (!pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res)) { @@ -132,12 +132,12 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( } grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( - gpr_slice encoded_grpc_grpclb_response) { + grpc_slice encoded_grpc_grpclb_response) { bool status; decode_serverlist_arg arg; pb_istream_t stream = - pb_istream_from_buffer(GPR_SLICE_START_PTR(encoded_grpc_grpclb_response), - GPR_SLICE_LENGTH(encoded_grpc_grpclb_response)); + pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response), + GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response)); pb_istream_t stream_at_start = stream; grpc_grpclb_response res; memset(&res, 0, sizeof(grpc_grpclb_response)); diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/lb_policy/grpclb/load_balancer_api.h index 079a64a3f3..b4c967e426 100644 --- a/src/core/ext/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.h @@ -34,7 +34,7 @@ #ifndef GRPC_CORE_EXT_LB_POLICY_GRPCLB_LOAD_BALANCER_API_H #define GRPC_CORE_EXT_LB_POLICY_GRPCLB_LOAD_BALANCER_API_H -#include <grpc/support/slice_buffer.h> +#include <grpc/slice_buffer.h> #include "src/core/ext/client_channel/lb_policy_factory.h" #include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h" @@ -60,7 +60,7 @@ typedef struct grpc_grpclb_serverlist { grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name); /** Protocol Buffers v3-encode \a request */ -gpr_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request); +grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request); /** Destroy \a request */ void grpc_grpclb_request_destroy(grpc_grpclb_request *request); @@ -68,11 +68,11 @@ void grpc_grpclb_request_destroy(grpc_grpclb_request *request); /** Parse (ie, decode) the bytes in \a encoded_grpc_grpclb_response as a \a * grpc_grpclb_initial_response */ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( - gpr_slice encoded_grpc_grpclb_response); + grpc_slice encoded_grpc_grpclb_response); /** Parse the list of servers from an encoded \a grpc_grpclb_response */ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( - gpr_slice encoded_grpc_grpclb_response); + grpc_slice encoded_grpc_grpclb_response); /** Return a copy of \a sl. The caller is responsible for calling \a * grpc_grpclb_destroy_serverlist on the returned copy. */ diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index ac3c6a305a..c69f773e78 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -292,6 +292,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } else { loop: switch (p->checking_connectivity) { + case GRPC_CHANNEL_INIT: + GPR_UNREACHABLE_CODE(return ); case GRPC_CHANNEL_READY: grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 0fd3abe099..59f84054c4 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -116,8 +116,13 @@ typedef struct { grpc_closure connectivity_changed_closure; /** this subchannels current position in subchannel->ready_list */ ready_list *ready_list_node; - /** last observed connectivity */ - grpc_connectivity_state connectivity_state; + /** last observed connectivity. Not updated by + * \a grpc_subchannel_notify_on_state_change. Used to determine the previous + * state while processing the new state in \a rr_connectivity_changed */ + grpc_connectivity_state prev_connectivity_state; + /** current connectivity state. Updated by \a + * grpc_subchannel_notify_on_state_change */ + grpc_connectivity_state curr_connectivity_state; /** the subchannel's target user data */ void *user_data; /** vtable to operate over \a user_data */ @@ -127,6 +132,7 @@ typedef struct { struct round_robin_lb_policy { /** base policy: must be first */ grpc_lb_policy base; + gpr_mu mu; /** total number of addresses received at creation time */ size_t num_addresses; @@ -135,8 +141,11 @@ struct round_robin_lb_policy { size_t num_subchannels; subchannel_data **subchannels; - /** mutex protecting remaining members */ - gpr_mu mu; + /** how many subchannels are in TRANSIENT_FAILURE */ + size_t num_transient_failures; + /** how many subchannels are IDLE */ + size_t num_idle; + /** have we started picking? */ int started_picking; /** are we shutting down? */ @@ -258,6 +267,10 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, gpr_free(node); } +static bool is_ready_list_empty(round_robin_lb_policy *p) { + return p->ready_list.prev == NULL; +} + static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; ready_list *elem; @@ -268,7 +281,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { for (size_t i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin_destroy"); + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy"); if (sd->user_data != NULL) { GPR_ASSERT(sd->user_data_vtable != NULL); sd->user_data_vtable->destroy(sd->user_data); @@ -381,18 +394,18 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { size_t i; p->started_picking = 1; - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, (void *)p, - p->num_subchannels); - } - for (i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; - sd->connectivity_state = GRPC_CHANNEL_IDLE; + /* use some sentinel value outside of the range of grpc_connectivity_state + * to signal an undefined previous state. We won't be referring to this + * value again and it'll be overwritten after the first call to + * rr_connectivity_changed */ + sd->prev_connectivity_state = GRPC_CHANNEL_INIT; + sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; + GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); grpc_subchannel_notify_on_state_change( exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->connectivity_state, &sd->connectivity_changed_closure); - GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity"); + &sd->curr_connectivity_state, &sd->connectivity_changed_closure); } } @@ -422,7 +435,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, /* readily available, report right away */ *target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected->subchannel), - "picked"); + "rr_picked"); if (user_data != NULL) { *user_data = selected->user_data; @@ -453,125 +466,184 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } } +static void update_state_counters(subchannel_data *sd) { + round_robin_lb_policy *p = sd->policy; + + /* update p->num_transient_failures (resp. p->num_idle): if the previous + * state was TRANSIENT_FAILURE (resp. IDLE), decrement + * p->num_transient_failures (resp. p->num_idle). */ + if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GPR_ASSERT(p->num_transient_failures > 0); + --p->num_transient_failures; + } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { + GPR_ASSERT(p->num_idle > 0); + --p->num_idle; + } +} + +/* sd is the subchannel_data associted with the updated subchannel. + * shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE + * or SHUTDOWN */ +static grpc_connectivity_state update_lb_connectivity_status( + grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) { + /* In priority order. The first rule to match terminates the search (ie, if we + * are on rule n, all previous rules were unfulfilled). + * + * 1) RULE: ANY subchannel is READY => policy is READY. + * CHECK: At least one subchannel is ready iff p->ready_list is NOT empty. + * + * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING. + * CHECK: sd->curr_connectivity_state == CONNECTING. + * + * 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN. + * CHECK: p->num_subchannels = 0. + * + * 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is + * TRANSIENT_FAILURE. + * CHECK: p->num_transient_failures == p->num_subchannels. + * + * 5) RULE: ALL subchannels are IDLE => policy is IDLE. + * CHECK: p->num_idle == p->num_subchannels. + */ + round_robin_lb_policy *p = sd->policy; + if (!is_ready_list_empty(p)) { /* 1) READY */ + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, + GRPC_ERROR_NONE, "rr_ready"); + return GRPC_CHANNEL_READY; + } else if (sd->curr_connectivity_state == + GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */ + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, + "rr_connecting"); + return GRPC_CHANNEL_CONNECTING; + } else if (p->num_subchannels == 0) { /* 3) SHUTDOWN */ + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), + "rr_shutdown"); + return GRPC_CHANNEL_SHUTDOWN; + } else if (p->num_transient_failures == + p->num_subchannels) { /* 4) TRANSIENT_FAILURE */ + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "rr_transient_failure"); + return GRPC_CHANNEL_TRANSIENT_FAILURE; + } else if (p->num_idle == p->num_subchannels) { /* 5) IDLE */ + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE, + GRPC_ERROR_NONE, "rr_idle"); + return GRPC_CHANNEL_IDLE; + } + /* no change */ + return sd->curr_connectivity_state; +} + static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { subchannel_data *sd = arg; round_robin_lb_policy *p = sd->policy; pending_pick *pp; - int unref = 0; - GRPC_ERROR_REF(error); gpr_mu_lock(&p->mu); if (p->shutdown) { - unref = 1; - } else { - switch (sd->connectivity_state) { - case GRPC_CHANNEL_READY: - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_READY, GRPC_ERROR_REF(error), - "connecting_ready"); - /* add the newly connected subchannel to the list of connected ones. - * Note that it goes to the "end of the line". */ - sd->ready_list_node = add_connected_sc_locked(p, sd); - /* at this point we know there's at least one suitable subchannel. Go - * ahead and pick one and notify the pending suitors in - * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ - ready_list *selected = peek_next_connected_locked(p); - GPR_ASSERT(selected != NULL); - if (p->pending_picks != NULL) { - /* if the selected subchannel is going to be used for the pending - * picks, update the last picked pointer */ - advance_last_picked_locked(p); + gpr_mu_unlock(&p->mu); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); + GRPC_ERROR_UNREF(error); + return; + } + switch (sd->curr_connectivity_state) { + case GRPC_CHANNEL_INIT: + GPR_UNREACHABLE_CODE(return ); + case GRPC_CHANNEL_READY: + /* add the newly connected subchannel to the list of connected ones. + * Note that it goes to the "end of the line". */ + sd->ready_list_node = add_connected_sc_locked(p, sd); + /* at this point we know there's at least one suitable subchannel. Go + * ahead and pick one and notify the pending suitors in + * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ + ready_list *selected = peek_next_connected_locked(p); + GPR_ASSERT(selected != NULL); + if (p->pending_picks != NULL) { + /* if the selected subchannel is going to be used for the pending + * picks, update the last picked pointer */ + advance_last_picked_locked(p); + } + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(selected->subchannel), + "rr_picked"); + if (pp->user_data != NULL) { + *pp->user_data = selected->user_data; } - + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, + "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", + (void *)selected->subchannel, (void *)selected); + } + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); + gpr_free(pp); + } + update_lb_connectivity_status(exec_ctx, sd, error); + sd->prev_connectivity_state = sd->curr_connectivity_state; + /* renew notification: reuses the "rr_connectivity" weak ref */ + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->curr_connectivity_state, &sd->connectivity_changed_closure); + break; + case GRPC_CHANNEL_IDLE: + ++p->num_idle; + /* fallthrough */ + case GRPC_CHANNEL_CONNECTING: + update_state_counters(sd); + update_lb_connectivity_status(exec_ctx, sd, error); + sd->prev_connectivity_state = sd->curr_connectivity_state; + /* renew notification: reuses the "rr_connectivity" weak ref */ + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->curr_connectivity_state, &sd->connectivity_changed_closure); + break; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + ++p->num_transient_failures; + /* remove from ready list if still present */ + if (sd->ready_list_node != NULL) { + remove_disconnected_sc_locked(p, sd->ready_list_node); + sd->ready_list_node = NULL; + } + update_lb_connectivity_status(exec_ctx, sd, error); + sd->prev_connectivity_state = sd->curr_connectivity_state; + /* renew notification: reuses the "rr_connectivity" weak ref */ + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->curr_connectivity_state, &sd->connectivity_changed_closure); + break; + case GRPC_CHANNEL_SHUTDOWN: + update_state_counters(sd); + if (sd->ready_list_node != NULL) { + remove_disconnected_sc_locked(p, sd->ready_list_node); + sd->ready_list_node = NULL; + } + --p->num_subchannels; + GPR_SWAP(subchannel_data *, p->subchannels[sd->index], + p->subchannels[p->num_subchannels]); + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown"); + p->subchannels[sd->index]->index = sd->index; + if (update_lb_connectivity_status(exec_ctx, sd, error) == + GRPC_CHANNEL_SHUTDOWN) { + /* the policy is shutting down. Flush all the pending picks... */ while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - - *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(selected->subchannel), - "picked"); - if (pp->user_data != NULL) { - *pp->user_data = selected->user_data; - } - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, - "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", - (void *)selected->subchannel, (void *)selected); - } + *pp->target = NULL; grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); } - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE: - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, sd->connectivity_state, - GRPC_ERROR_REF(error), "connecting_changed"); - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - /* renew state notification */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->connectivity_state, &sd->connectivity_changed_closure); - - /* remove from ready list if still present */ - if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked(p, sd->ready_list_node); - sd->ready_list_node = NULL; - } - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "connecting_transient_failure"); - break; - case GRPC_CHANNEL_SHUTDOWN: - if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked(p, sd->ready_list_node); - sd->ready_list_node = NULL; - } - - p->num_subchannels--; - GPR_SWAP(subchannel_data *, p->subchannels[sd->index], - p->subchannels[p->num_subchannels]); - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); - p->subchannels[sd->index]->index = sd->index; - gpr_free(sd); - - unref = 1; - if (p->num_subchannels == 0) { - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_REFERENCING("Round Robin Channels Exhausted", - &error, 1), - "no_more_channels"); - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, - NULL); - gpr_free(pp); - } - } else { - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "subchannel_failed"); - } - } /* switch */ - } /* !unref */ - - gpr_mu_unlock(&p->mu); - - if (unref) { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "round_robin_connectivity"); + } + gpr_free(sd); + /* unref the "rr_connectivity" weak ref from start_picking */ + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); + break; } - + gpr_mu_unlock(&p->mu); GRPC_ERROR_UNREF(error); } @@ -607,9 +679,9 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_unlock(&p->mu); target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected->subchannel), - "picked"); + "rr_picked"); grpc_connected_subchannel_ping(exec_ctx, target, closure); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "picked"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked"); } else { gpr_mu_unlock(&p->mu); grpc_exec_ctx_sched(exec_ctx, closure, @@ -705,6 +777,11 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); + + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels", + (void *)p, (unsigned long)p->num_subchannels); + } gpr_mu_init(&p->mu); return &p->base; } diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c index eeae2400fb..b810e20bb9 100644 --- a/src/core/ext/load_reporting/load_reporting_filter.c +++ b/src/core/ext/load_reporting/load_reporting_filter.c @@ -232,4 +232,5 @@ const grpc_channel_filter grpc_load_reporting_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "load_reporting"}; diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index 5fec03a8e4..26a650aadd 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -47,6 +47,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" typedef struct { @@ -169,17 +170,17 @@ static grpc_resolver *sockaddr_create(grpc_resolver_args *args, return NULL; } /* Construct addresses. */ - gpr_slice path_slice = - gpr_slice_new(args->uri->path, strlen(args->uri->path), do_nothing); - gpr_slice_buffer path_parts; - gpr_slice_buffer_init(&path_parts); - gpr_slice_split(path_slice, ",", &path_parts); + grpc_slice path_slice = + grpc_slice_new(args->uri->path, strlen(args->uri->path), do_nothing); + grpc_slice_buffer path_parts; + grpc_slice_buffer_init(&path_parts); + grpc_slice_split(path_slice, ",", &path_parts); grpc_lb_addresses *addresses = grpc_lb_addresses_create(path_parts.count, NULL /* user_data_vtable */); bool errors_found = false; for (size_t i = 0; i < addresses->num_addresses; i++) { grpc_uri ith_uri = *args->uri; - char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII); + char *part_str = grpc_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII); ith_uri.path = part_str; if (!parse(&ith_uri, &addresses->addresses[i].address)) { errors_found = true; /* GPR_TRUE */ @@ -187,8 +188,8 @@ static grpc_resolver *sockaddr_create(grpc_resolver_args *args, gpr_free(part_str); if (errors_found) break; } - gpr_slice_buffer_destroy(&path_parts); - gpr_slice_unref(path_slice); + grpc_slice_buffer_destroy(&path_parts); + grpc_slice_unref(path_slice); if (errors_found) { grpc_lb_addresses_destroy(addresses); return NULL; diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 71a06e118b..8e03fd82c1 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -36,9 +36,9 @@ #include <stdlib.h> #include <string.h> +#include <grpc/slice.h> +#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> -#include <grpc/support/slice.h> -#include <grpc/support/slice_buffer.h> #include "src/core/ext/client_channel/client_channel.h" #include "src/core/ext/client_channel/http_connect_handshaker.h" @@ -64,7 +64,7 @@ typedef struct { grpc_connect_in_args args; grpc_connect_out_args *result; grpc_closure initial_string_sent; - gpr_slice_buffer initial_string_buffer; + grpc_slice_buffer initial_string_buffer; grpc_endpoint *tcp; @@ -94,7 +94,7 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, grpc_channel_args *args, - gpr_slice_buffer *read_buffer, void *user_data, + grpc_slice_buffer *read_buffer, void *user_data, grpc_error *error) { connector *c = user_data; if (error != GRPC_ERROR_NONE) { @@ -117,12 +117,12 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector *c = arg; grpc_endpoint *tcp = c->tcp; if (tcp != NULL) { - if (!GPR_SLICE_IS_EMPTY(c->args.initial_connect_string)) { + if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) { grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, c); - gpr_slice_buffer_init(&c->initial_string_buffer); - gpr_slice_buffer_add(&c->initial_string_buffer, - c->args.initial_connect_string); + grpc_slice_buffer_init(&c->initial_string_buffer); + grpc_slice_buffer_add(&c->initial_string_buffer, + c->args.initial_connect_string); connector_ref(arg); grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, &c->initial_string_sent); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index d0ac72a011..04c88a2d36 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -36,9 +36,9 @@ #include <stdlib.h> #include <string.h> +#include <grpc/slice.h> +#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> -#include <grpc/support/slice.h> -#include <grpc/support/slice_buffer.h> #include "src/core/ext/client_channel/client_channel.h" #include "src/core/ext/client_channel/http_connect_handshaker.h" @@ -68,7 +68,7 @@ typedef struct { grpc_connect_in_args args; grpc_connect_out_args *result; grpc_closure initial_string_sent; - gpr_slice_buffer initial_string_buffer; + grpc_slice_buffer initial_string_buffer; gpr_mu mu; grpc_endpoint *connecting_endpoint; @@ -131,7 +131,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, grpc_channel_args *args, - gpr_slice_buffer *read_buffer, void *user_data, + grpc_slice_buffer *read_buffer, void *user_data, grpc_error *error) { connector *c = user_data; c->tmp_args = args; @@ -166,12 +166,12 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { GPR_ASSERT(c->connecting_endpoint == NULL); c->connecting_endpoint = tcp; gpr_mu_unlock(&c->mu); - if (!GPR_SLICE_IS_EMPTY(c->args.initial_connect_string)) { + if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) { grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, c); - gpr_slice_buffer_init(&c->initial_string_buffer); - gpr_slice_buffer_add(&c->initial_string_buffer, - c->args.initial_connect_string); + grpc_slice_buffer_init(&c->initial_string_buffer); + grpc_slice_buffer_add(&c->initial_string_buffer, + c->args.initial_connect_string); grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, &c->initial_string_sent); } else { diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index d42611b863..c18d618f96 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -56,7 +56,7 @@ typedef struct server_connect_state { static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, grpc_channel_args *args, - gpr_slice_buffer *read_buffer, void *user_data, + grpc_slice_buffer *read_buffer, void *user_data, grpc_error *error) { server_connect_state *state = user_data; if (error != GRPC_ERROR_NONE) { diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index 7ad687042d..942638ad7f 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -117,7 +117,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, grpc_channel_args *args, - gpr_slice_buffer *read_buffer, void *user_data, + grpc_slice_buffer *read_buffer, void *user_data, grpc_error *error) { server_secure_connect *connection_state = user_data; if (error != GRPC_ERROR_NONE) { diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.c b/src/core/ext/transport/chttp2/transport/bin_decoder.c index 2d90b01cd8..3eef80b557 100644 --- a/src/core/ext/transport/chttp2/transport/bin_decoder.c +++ b/src/core/ext/transport/chttp2/transport/bin_decoder.c @@ -34,6 +34,7 @@ #include "src/core/ext/transport/chttp2/transport/bin_decoder.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" static uint8_t decode_table[] = { @@ -142,11 +143,11 @@ bool grpc_base64_decode_partial(struct grpc_base64_decode_context *ctx) { return true; } -gpr_slice grpc_chttp2_base64_decode(gpr_slice input) { - size_t input_length = GPR_SLICE_LENGTH(input); +grpc_slice grpc_chttp2_base64_decode(grpc_slice input) { + size_t input_length = GRPC_SLICE_LENGTH(input); size_t output_length = input_length / 4 * 3; struct grpc_base64_decode_context ctx; - gpr_slice output; + grpc_slice output; if (input_length % 4 != 0) { gpr_log(GPR_ERROR, @@ -158,7 +159,7 @@ gpr_slice grpc_chttp2_base64_decode(gpr_slice input) { } if (input_length > 0) { - uint8_t *input_end = GPR_SLICE_END_PTR(input); + uint8_t *input_end = GRPC_SLICE_END_PTR(input); if (*(--input_end) == '=') { output_length--; if (*(--input_end) == '=') { @@ -166,30 +167,30 @@ gpr_slice grpc_chttp2_base64_decode(gpr_slice input) { } } } - output = gpr_slice_malloc(output_length); + output = grpc_slice_malloc(output_length); - ctx.input_cur = GPR_SLICE_START_PTR(input); - ctx.input_end = GPR_SLICE_END_PTR(input); - ctx.output_cur = GPR_SLICE_START_PTR(output); - ctx.output_end = GPR_SLICE_END_PTR(output); + ctx.input_cur = GRPC_SLICE_START_PTR(input); + ctx.input_end = GRPC_SLICE_END_PTR(input); + ctx.output_cur = GRPC_SLICE_START_PTR(output); + ctx.output_end = GRPC_SLICE_END_PTR(output); ctx.contains_tail = false; if (!grpc_base64_decode_partial(&ctx)) { - char *s = gpr_dump_slice(input, GPR_DUMP_ASCII); + char *s = grpc_dump_slice(input, GPR_DUMP_ASCII); gpr_log(GPR_ERROR, "Base64 decoding failed, input string:\n%s\n", s); gpr_free(s); - gpr_slice_unref(output); + grpc_slice_unref(output); return gpr_empty_slice(); } - GPR_ASSERT(ctx.output_cur == GPR_SLICE_END_PTR(output)); - GPR_ASSERT(ctx.input_cur == GPR_SLICE_END_PTR(input)); + GPR_ASSERT(ctx.output_cur == GRPC_SLICE_END_PTR(output)); + GPR_ASSERT(ctx.input_cur == GRPC_SLICE_END_PTR(input)); return output; } -gpr_slice grpc_chttp2_base64_decode_with_length(gpr_slice input, - size_t output_length) { - size_t input_length = GPR_SLICE_LENGTH(input); - gpr_slice output = gpr_slice_malloc(output_length); +grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input, + size_t output_length) { + size_t input_length = GRPC_SLICE_LENGTH(input); + grpc_slice output = grpc_slice_malloc(output_length); struct grpc_base64_decode_context ctx; // The length of a base64 string cannot be 4 * n + 1 @@ -199,7 +200,7 @@ gpr_slice grpc_chttp2_base64_decode_with_length(gpr_slice input, "grpc_chttp2_base64_decode_with_length has a length of %d, which " "has a tail of 1 byte.\n", (int)input_length); - gpr_slice_unref(output); + grpc_slice_unref(output); return gpr_empty_slice(); } @@ -209,24 +210,24 @@ gpr_slice grpc_chttp2_base64_decode_with_length(gpr_slice input, "than the max possible output length %d.\n", (int)output_length, (int)(input_length / 4 * 3 + tail_xtra[input_length % 4])); - gpr_slice_unref(output); + grpc_slice_unref(output); return gpr_empty_slice(); } - ctx.input_cur = GPR_SLICE_START_PTR(input); - ctx.input_end = GPR_SLICE_END_PTR(input); - ctx.output_cur = GPR_SLICE_START_PTR(output); - ctx.output_end = GPR_SLICE_END_PTR(output); + ctx.input_cur = GRPC_SLICE_START_PTR(input); + ctx.input_end = GRPC_SLICE_END_PTR(input); + ctx.output_cur = GRPC_SLICE_START_PTR(output); + ctx.output_end = GRPC_SLICE_END_PTR(output); ctx.contains_tail = true; if (!grpc_base64_decode_partial(&ctx)) { - char *s = gpr_dump_slice(input, GPR_DUMP_ASCII); + char *s = grpc_dump_slice(input, GPR_DUMP_ASCII); gpr_log(GPR_ERROR, "Base64 decoding failed, input string:\n%s\n", s); gpr_free(s); - gpr_slice_unref(output); + grpc_slice_unref(output); return gpr_empty_slice(); } - GPR_ASSERT(ctx.output_cur == GPR_SLICE_END_PTR(output)); - GPR_ASSERT(ctx.input_cur <= GPR_SLICE_END_PTR(input)); + GPR_ASSERT(ctx.output_cur == GRPC_SLICE_END_PTR(output)); + GPR_ASSERT(ctx.input_cur <= GRPC_SLICE_END_PTR(input)); return output; } diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.h b/src/core/ext/transport/chttp2/transport/bin_decoder.h index b9d40c9b74..83a90be519 100644 --- a/src/core/ext/transport/chttp2/transport/bin_decoder.h +++ b/src/core/ext/transport/chttp2/transport/bin_decoder.h @@ -34,7 +34,7 @@ #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_DECODER_H #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_DECODER_H -#include <grpc/support/slice.h> +#include <grpc/slice.h> #include <stdbool.h> struct grpc_base64_decode_context { @@ -55,12 +55,12 @@ bool grpc_base64_decode_partial(struct grpc_base64_decode_context *ctx); /* base64 decode a slice with pad chars. Returns a new slice, does not take ownership of the input. Returns an empty slice if decoding is failed. */ -gpr_slice grpc_chttp2_base64_decode(gpr_slice input); +grpc_slice grpc_chttp2_base64_decode(grpc_slice input); /* base64 decode a slice without pad chars, data length is needed. Returns a new slice, does not take ownership of the input. Returns an empty slice if decoding is failed. */ -gpr_slice grpc_chttp2_base64_decode_with_length(gpr_slice input, - size_t output_length); +grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input, + size_t output_length); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_DECODER_H */ diff --git a/src/core/ext/transport/chttp2/transport/bin_encoder.c b/src/core/ext/transport/chttp2/transport/bin_encoder.c index 1b43c28be1..af25a4352a 100644 --- a/src/core/ext/transport/chttp2/transport/bin_encoder.c +++ b/src/core/ext/transport/chttp2/transport/bin_encoder.c @@ -61,14 +61,14 @@ static const b64_huff_sym huff_alphabet[64] = { static const uint8_t tail_xtra[3] = {0, 2, 3}; -gpr_slice grpc_chttp2_base64_encode(gpr_slice input) { - size_t input_length = GPR_SLICE_LENGTH(input); +grpc_slice grpc_chttp2_base64_encode(grpc_slice input) { + size_t input_length = GRPC_SLICE_LENGTH(input); size_t input_triplets = input_length / 3; size_t tail_case = input_length % 3; size_t output_length = input_triplets * 4 + tail_xtra[tail_case]; - gpr_slice output = gpr_slice_malloc(output_length); - uint8_t *in = GPR_SLICE_START_PTR(input); - char *out = (char *)GPR_SLICE_START_PTR(output); + grpc_slice output = grpc_slice_malloc(output_length); + uint8_t *in = GRPC_SLICE_START_PTR(input); + char *out = (char *)GRPC_SLICE_START_PTR(output); size_t i; /* encode full triplets */ @@ -100,27 +100,29 @@ gpr_slice grpc_chttp2_base64_encode(gpr_slice input) { break; } - GPR_ASSERT(out == (char *)GPR_SLICE_END_PTR(output)); - GPR_ASSERT(in == GPR_SLICE_END_PTR(input)); + GPR_ASSERT(out == (char *)GRPC_SLICE_END_PTR(output)); + GPR_ASSERT(in == GRPC_SLICE_END_PTR(input)); return output; } -gpr_slice grpc_chttp2_huffman_compress(gpr_slice input) { +grpc_slice grpc_chttp2_huffman_compress(grpc_slice input) { size_t nbits; uint8_t *in; uint8_t *out; - gpr_slice output; + grpc_slice output; uint32_t temp = 0; uint32_t temp_length = 0; nbits = 0; - for (in = GPR_SLICE_START_PTR(input); in != GPR_SLICE_END_PTR(input); ++in) { + for (in = GRPC_SLICE_START_PTR(input); in != GRPC_SLICE_END_PTR(input); + ++in) { nbits += grpc_chttp2_huffsyms[*in].length; } - output = gpr_slice_malloc(nbits / 8 + (nbits % 8 != 0)); - out = GPR_SLICE_START_PTR(output); - for (in = GPR_SLICE_START_PTR(input); in != GPR_SLICE_END_PTR(input); ++in) { + output = grpc_slice_malloc(nbits / 8 + (nbits % 8 != 0)); + out = GRPC_SLICE_START_PTR(output); + for (in = GRPC_SLICE_START_PTR(input); in != GRPC_SLICE_END_PTR(input); + ++in) { int sym = *in; temp <<= grpc_chttp2_huffsyms[sym].length; temp |= grpc_chttp2_huffsyms[sym].bits; @@ -141,7 +143,7 @@ gpr_slice grpc_chttp2_huffman_compress(gpr_slice input) { (uint8_t)(0xffu >> temp_length)); } - GPR_ASSERT(out == GPR_SLICE_END_PTR(output)); + GPR_ASSERT(out == GRPC_SLICE_END_PTR(output)); return output; } @@ -175,16 +177,17 @@ static void enc_add1(huff_out *out, uint8_t a) { enc_flush_some(out); } -gpr_slice grpc_chttp2_base64_encode_and_huffman_compress_impl(gpr_slice input) { - size_t input_length = GPR_SLICE_LENGTH(input); +grpc_slice grpc_chttp2_base64_encode_and_huffman_compress_impl( + grpc_slice input) { + size_t input_length = GRPC_SLICE_LENGTH(input); size_t input_triplets = input_length / 3; size_t tail_case = input_length % 3; size_t output_syms = input_triplets * 4 + tail_xtra[tail_case]; size_t max_output_bits = 11 * output_syms; size_t max_output_length = max_output_bits / 8 + (max_output_bits % 8 != 0); - gpr_slice output = gpr_slice_malloc(max_output_length); - uint8_t *in = GPR_SLICE_START_PTR(input); - uint8_t *start_out = GPR_SLICE_START_PTR(output); + grpc_slice output = grpc_slice_malloc(max_output_length); + uint8_t *in = GRPC_SLICE_START_PTR(input); + uint8_t *start_out = GRPC_SLICE_START_PTR(output); huff_out out; size_t i; @@ -231,9 +234,9 @@ gpr_slice grpc_chttp2_base64_encode_and_huffman_compress_impl(gpr_slice input) { (uint8_t)(0xffu >> out.temp_length)); } - GPR_ASSERT(out.out <= GPR_SLICE_END_PTR(output)); - GPR_SLICE_SET_LENGTH(output, out.out - start_out); + GPR_ASSERT(out.out <= GRPC_SLICE_END_PTR(output)); + GRPC_SLICE_SET_LENGTH(output, out.out - start_out); - GPR_ASSERT(in == GPR_SLICE_END_PTR(input)); + GPR_ASSERT(in == GRPC_SLICE_END_PTR(input)); return output; } diff --git a/src/core/ext/transport/chttp2/transport/bin_encoder.h b/src/core/ext/transport/chttp2/transport/bin_encoder.h index 61ebbafa9a..9e143b46e2 100644 --- a/src/core/ext/transport/chttp2/transport/bin_encoder.h +++ b/src/core/ext/transport/chttp2/transport/bin_encoder.h @@ -34,21 +34,22 @@ #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_ENCODER_H #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_ENCODER_H -#include <grpc/support/slice.h> +#include <grpc/slice.h> /* base64 encode a slice. Returns a new slice, does not take ownership of the input */ -gpr_slice grpc_chttp2_base64_encode(gpr_slice input); +grpc_slice grpc_chttp2_base64_encode(grpc_slice input); /* Compress a slice with the static huffman encoder detailed in the hpack standard. Returns a new slice, does not take ownership of the input */ -gpr_slice grpc_chttp2_huffman_compress(gpr_slice input); +grpc_slice grpc_chttp2_huffman_compress(grpc_slice input); /* equivalent to: - gpr_slice x = grpc_chttp2_base64_encode(input); - gpr_slice y = grpc_chttp2_huffman_compress(x); - gpr_slice_unref(x); + grpc_slice x = grpc_chttp2_base64_encode(input); + grpc_slice y = grpc_chttp2_huffman_compress(x); + grpc_slice_unref(x); return y; */ -gpr_slice grpc_chttp2_base64_encode_and_huffman_compress_impl(gpr_slice input); +grpc_slice grpc_chttp2_base64_encode_and_huffman_compress_impl( + grpc_slice input); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_ENCODER_H */ diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 4a9f806354..127e1cdc13 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -38,9 +38,9 @@ #include <stdio.h> #include <string.h> +#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/slice_buffer.h> #include <grpc/support/string_util.h> #include <grpc/support/useful.h> @@ -51,6 +51,7 @@ #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/timeout_encoding.h" @@ -143,12 +144,12 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_endpoint_destroy(exec_ctx, t->ep); - gpr_slice_buffer_destroy(&t->qbuf); + grpc_slice_buffer_destroy(&t->qbuf); - gpr_slice_buffer_destroy(&t->outbuf); + grpc_slice_buffer_destroy(&t->outbuf); grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor); - gpr_slice_buffer_destroy(&t->read_buffer); + grpc_slice_buffer_destroy(&t->read_buffer); grpc_chttp2_hpack_parser_destroy(&t->hpack_parser); grpc_chttp2_goaway_parser_destroy(&t->goaway_parser); @@ -243,9 +244,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &t->channel_callback.state_tracker, GRPC_CHANNEL_READY, is_client ? "client_transport" : "server_transport"); - gpr_slice_buffer_init(&t->qbuf); + grpc_slice_buffer_init(&t->qbuf); - gpr_slice_buffer_init(&t->outbuf); + grpc_slice_buffer_init(&t->outbuf); grpc_chttp2_hpack_compressor_init(&t->hpack_compressor); grpc_closure_init(&t->write_action_begin_locked, write_action_begin_locked, @@ -264,7 +265,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_hpack_parser_init(&t->hpack_parser); - gpr_slice_buffer_init(&t->read_buffer); + grpc_slice_buffer_init(&t->read_buffer); /* 8 is a random stab in the dark as to a good initial size: it's small enough that it shouldn't waste memory for infrequently used connections, yet @@ -286,8 +287,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->sent_local_settings = 0; if (is_client) { - gpr_slice_buffer_add(&t->outbuf, gpr_slice_from_copied_string( - GRPC_CHTTP2_CLIENT_CONNECT_STRING)); + grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( + GRPC_CHTTP2_CLIENT_CONNECT_STRING)); grpc_chttp2_initiate_write(exec_ctx, t, false, "initial_write"); } @@ -471,7 +472,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0]); grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1]); grpc_chttp2_data_parser_init(&s->data_parser); - gpr_slice_buffer_init(&s->flow_controlled_buffer); + grpc_slice_buffer_init(&s->flow_controlled_buffer); s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); grpc_closure_init(&s->complete_fetch, complete_fetch, s); grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s); @@ -531,7 +532,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, grpc_chttp2_data_parser_destroy(exec_ctx, &s->data_parser); grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[0]); grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[1]); - gpr_slice_buffer_destroy(&s->flow_controlled_buffer); + grpc_slice_buffer_destroy(&s->flow_controlled_buffer); GRPC_ERROR_UNREF(s->read_closed_error); GRPC_ERROR_UNREF(s->write_closed_error); @@ -756,11 +757,11 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, uint32_t goaway_error, - gpr_slice goaway_text) { - char *msg = gpr_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII); + grpc_slice goaway_text) { + char *msg = grpc_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII); GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg)); - gpr_slice_unref(goaway_text); + grpc_slice_unref(goaway_text); t->seen_goaway = 1; /* lie: use transient failure from the transport to indicate goaway has been * received */ @@ -884,8 +885,8 @@ static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { s->fetched_send_message_length += - (uint32_t)GPR_SLICE_LENGTH(s->fetching_slice); - gpr_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice); + (uint32_t)GRPC_SLICE_LENGTH(s->fetching_slice); + grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice); if (s->id != 0) { grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); } @@ -1036,7 +1037,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, "op.send_initial_metadata"); } } else { - s->send_trailing_metadata = NULL; + s->send_initial_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_CREATE( @@ -1056,7 +1057,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } else { GPR_ASSERT(s->fetching_send_message == NULL); uint8_t *frame_hdr = - gpr_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5); + grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5); uint32_t flags = op->send_message->flags; frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0; size_t len = op->send_message->length; @@ -1196,7 +1197,7 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, p->id[7] = (uint8_t)(t->ping_counter & 0xff); t->ping_counter++; p->on_recv = on_recv; - gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id)); + grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id)); grpc_chttp2_initiate_write(exec_ctx, t, true, "send_ping"); } @@ -1220,7 +1221,7 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_error_code error, gpr_slice data) { + grpc_chttp2_error_code error, grpc_slice data) { t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED; grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)error, data, &t->qbuf); @@ -1243,7 +1244,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, if (op->send_goaway) { send_goaway(exec_ctx, t, grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), - gpr_slice_ref(*op->goaway_message)); + grpc_slice_ref(*op->goaway_message)); } if (op->set_accept_stream) { @@ -1432,7 +1433,7 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, &grpc_status); if (s->id != 0) { - gpr_slice_buffer_add( + grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(s->id, (uint32_t)http_error, &s->stats.outgoing)); grpc_chttp2_initiate_write(exec_ctx, t, false, "rst_stream"); @@ -1445,7 +1446,7 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, free_msg = true; msg = grpc_error_string(due_to_error); } - gpr_slice msg_slice = gpr_slice_from_copied_string(msg); + grpc_slice msg_slice = grpc_slice_from_copied_string(msg); grpc_chttp2_fake_status(exec_ctx, t, s, grpc_status, &msg_slice); if (free_msg) grpc_error_free_string(msg); } @@ -1458,7 +1459,7 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_status_code status, - gpr_slice *slice) { + grpc_slice *slice) { if (status != GRPC_STATUS_OK) { s->seen_error = true; } @@ -1481,13 +1482,13 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &s->metadata_buffer[1], grpc_mdelem_from_metadata_strings( GRPC_MDSTR_GRPC_MESSAGE, - grpc_mdstr_from_slice(gpr_slice_ref(*slice)))); + grpc_mdstr_from_slice(grpc_slice_ref(*slice)))); } s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE; grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } if (slice) { - gpr_slice_unref(*slice); + grpc_slice_unref(*slice); } } @@ -1522,13 +1523,17 @@ static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_error *error) { error = removal_error(error, s, "Pending writes failed due to stream closure"); - s->fetching_send_message = NULL; + s->send_initial_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_REF(error), "send_initial_metadata_finished"); + + s->send_trailing_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->send_trailing_metadata_finished, GRPC_ERROR_REF(error), "send_trailing_metadata_finished"); + + s->fetching_send_message = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error), "fetching_send_message_finished"); @@ -1584,9 +1589,9 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error) { - gpr_slice hdr; - gpr_slice status_hdr; - gpr_slice message_pfx; + grpc_slice hdr; + grpc_slice status_hdr; + grpc_slice message_pfx; uint8_t *p; uint32_t len = 0; grpc_status_code grpc_status; @@ -1605,8 +1610,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, time we got around to sending this, so instead we ignore HPACK compression and just write the uncompressed bytes onto the wire. */ - status_hdr = gpr_slice_malloc(15 + (grpc_status >= 10)); - p = GPR_SLICE_START_PTR(status_hdr); + status_hdr = grpc_slice_malloc(15 + (grpc_status >= 10)); + p = GRPC_SLICE_START_PTR(status_hdr); *p++ = 0x40; /* literal header */ *p++ = 11; /* len(grpc-status) */ *p++ = 'g'; @@ -1628,8 +1633,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, *p++ = (uint8_t)('0' + (grpc_status / 10)); *p++ = (uint8_t)('0' + (grpc_status % 10)); } - GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr)); - len += (uint32_t)GPR_SLICE_LENGTH(status_hdr); + GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr)); + len += (uint32_t)GRPC_SLICE_LENGTH(status_hdr); const char *optional_message = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE); @@ -1637,8 +1642,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (optional_message != NULL) { size_t msg_len = strlen(optional_message); GPR_ASSERT(msg_len < 127); - message_pfx = gpr_slice_malloc(15); - p = GPR_SLICE_START_PTR(message_pfx); + message_pfx = grpc_slice_malloc(15); + p = GRPC_SLICE_START_PTR(message_pfx); *p++ = 0x40; *p++ = 12; /* len(grpc-message) */ *p++ = 'g'; @@ -1654,13 +1659,13 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, *p++ = 'g'; *p++ = 'e'; *p++ = (uint8_t)msg_len; - GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx)); - len += (uint32_t)GPR_SLICE_LENGTH(message_pfx); + GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx)); + len += (uint32_t)GRPC_SLICE_LENGTH(message_pfx); len += (uint32_t)msg_len; } - hdr = gpr_slice_malloc(9); - p = GPR_SLICE_START_PTR(hdr); + hdr = grpc_slice_malloc(9); + p = GRPC_SLICE_START_PTR(hdr); *p++ = (uint8_t)(len >> 16); *p++ = (uint8_t)(len >> 8); *p++ = (uint8_t)(len); @@ -1670,16 +1675,16 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, *p++ = (uint8_t)(s->id >> 16); *p++ = (uint8_t)(s->id >> 8); *p++ = (uint8_t)(s->id); - GPR_ASSERT(p == GPR_SLICE_END_PTR(hdr)); + GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr)); - gpr_slice_buffer_add(&t->qbuf, hdr); - gpr_slice_buffer_add(&t->qbuf, status_hdr); + grpc_slice_buffer_add(&t->qbuf, hdr); + grpc_slice_buffer_add(&t->qbuf, status_hdr); if (optional_message) { - gpr_slice_buffer_add(&t->qbuf, message_pfx); - gpr_slice_buffer_add(&t->qbuf, - gpr_slice_from_copied_string(optional_message)); + grpc_slice_buffer_add(&t->qbuf, message_pfx); + grpc_slice_buffer_add(&t->qbuf, + grpc_slice_from_copied_string(optional_message)); } - gpr_slice_buffer_add( + grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_CHTTP2_NO_ERROR, &s->stats.outgoing)); } @@ -1690,7 +1695,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, free_msg = true; msg = grpc_error_string(error); } - gpr_slice msg_slice = gpr_slice_from_copied_string(msg); + grpc_slice msg_slice = grpc_slice_from_copied_string(msg); grpc_chttp2_fake_status(exec_ctx, t, s, grpc_status, &msg_slice); if (free_msg) grpc_error_free_string(msg); @@ -1861,7 +1866,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, keep_reading = true; GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading"); } - gpr_slice_buffer_reset_and_unref(&t->read_buffer); + grpc_slice_buffer_reset_and_unref(&t->read_buffer); if (keep_reading) { grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->read_action_begin); @@ -1915,7 +1920,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs) { if (gpr_unref(&bs->refs)) { GRPC_ERROR_UNREF(bs->error); - gpr_slice_buffer_destroy(&bs->slices); + grpc_slice_buffer_destroy(&bs->slices); gpr_mu_destroy(&bs->slice_mu); gpr_free(bs); } @@ -1977,7 +1982,7 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, } gpr_mu_lock(&bs->slice_mu); if (bs->slices.count > 0) { - *bs->next_action.slice = gpr_slice_buffer_take_first(&bs->slices); + *bs->next_action.slice = grpc_slice_buffer_take_first(&bs->slices); grpc_closure_run(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE); } else if (bs->error != GRPC_ERROR_NONE) { grpc_closure_run(exec_ctx, bs->next_action.on_complete, @@ -1992,7 +1997,7 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, - gpr_slice *slice, size_t max_size_hint, + grpc_slice *slice, size_t max_size_hint, grpc_closure *on_complete) { GPR_TIMER_BEGIN("incoming_byte_stream_next", 0); grpc_chttp2_incoming_byte_stream *bs = @@ -2045,19 +2050,19 @@ static void incoming_byte_stream_publish_error( void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - gpr_slice slice) { + grpc_slice slice) { gpr_mu_lock(&bs->slice_mu); - if (bs->remaining_bytes < GPR_SLICE_LENGTH(slice)) { + if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) { incoming_byte_stream_publish_error( exec_ctx, bs, GRPC_ERROR_CREATE("Too many bytes in stream")); } else { - bs->remaining_bytes -= (uint32_t)GPR_SLICE_LENGTH(slice); + bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice); if (bs->on_next != NULL) { *bs->next = slice; grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL); bs->on_next = NULL; } else { - gpr_slice_buffer_add(&bs->slices, slice); + grpc_slice_buffer_add(&bs->slices, slice); } } gpr_mu_unlock(&bs->slice_mu); @@ -2095,7 +2100,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( incoming_byte_stream->transport = t; incoming_byte_stream->stream = s; gpr_ref(&incoming_byte_stream->stream->active_streams); - gpr_slice_buffer_init(&incoming_byte_stream->slices); + grpc_slice_buffer_init(&incoming_byte_stream->slices); incoming_byte_stream->on_next = NULL; incoming_byte_stream->is_tail = 1; incoming_byte_stream->error = GRPC_ERROR_NONE; @@ -2163,7 +2168,7 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, t->peer_string); } send_goaway(exec_ctx, t, GRPC_CHTTP2_ENHANCE_YOUR_CALM, - gpr_slice_from_static_string("Buffers full")); + grpc_slice_from_static_string("Buffers full")); } else if (error == GRPC_ERROR_NONE && grpc_resource_quota_trace) { gpr_log(GPR_DEBUG, "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR @@ -2293,6 +2298,14 @@ static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) { return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string); } +/******************************************************************************* + * MONITORING + */ +static grpc_endpoint *chttp2_get_endpoint(grpc_exec_ctx *exec_ctx, + grpc_transport *t) { + return ((grpc_chttp2_transport *)t)->ep; +} + static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), "chttp2", init_stream, @@ -2302,7 +2315,8 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), perform_transport_op, destroy_stream, destroy_transport, - chttp2_get_peer}; + chttp2_get_peer, + chttp2_get_endpoint}; grpc_transport *grpc_create_chttp2_transport( grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args, @@ -2314,12 +2328,12 @@ grpc_transport *grpc_create_chttp2_transport( void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx, grpc_transport *transport, - gpr_slice_buffer *read_buffer) { + grpc_slice_buffer *read_buffer) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport; GRPC_CHTTP2_REF_TRANSPORT( t, "reading_action"); /* matches unref inside reading_action */ if (read_buffer != NULL) { - gpr_slice_buffer_move_into(read_buffer, &t->read_buffer); + grpc_slice_buffer_move_into(read_buffer, &t->read_buffer); gpr_free(read_buffer); } read_action_begin(exec_ctx, t, GRPC_ERROR_NONE); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.h b/src/core/ext/transport/chttp2/transport/chttp2_transport.h index 4e2d0954bf..c372174f2d 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.h +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.h @@ -48,6 +48,6 @@ grpc_transport *grpc_create_chttp2_transport( /// leftover bytes previously read from the endpoint (e.g., by handshakers). void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx, grpc_transport *transport, - gpr_slice_buffer *read_buffer); + grpc_slice_buffer *read_buffer); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_TRANSPORT_H */ diff --git a/src/core/ext/transport/chttp2/transport/frame.h b/src/core/ext/transport/chttp2/transport/frame.h index 1e444a91fd..ffd4d9669b 100644 --- a/src/core/ext/transport/chttp2/transport/frame.h +++ b/src/core/ext/transport/chttp2/transport/frame.h @@ -34,8 +34,8 @@ #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_H #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_H +#include <grpc/slice.h> #include <grpc/support/port_platform.h> -#include <grpc/support/slice.h> #include "src/core/lib/iomgr/error.h" diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c index 8668816930..f9b9e1b309 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.c +++ b/src/core/ext/transport/chttp2/transport/frame_data.c @@ -40,6 +40,7 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> #include "src/core/ext/transport/chttp2/transport/internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/transport.h" @@ -112,16 +113,16 @@ grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop( return out; } -void grpc_chttp2_encode_data(uint32_t id, gpr_slice_buffer *inbuf, +void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf, uint32_t write_bytes, int is_eof, grpc_transport_one_way_stats *stats, - gpr_slice_buffer *outbuf) { - gpr_slice hdr; + grpc_slice_buffer *outbuf) { + grpc_slice hdr; uint8_t *p; static const size_t header_size = 9; - hdr = gpr_slice_malloc(header_size); - p = GPR_SLICE_START_PTR(hdr); + hdr = grpc_slice_malloc(header_size); + p = GRPC_SLICE_START_PTR(hdr); GPR_ASSERT(write_bytes < (1 << 24)); *p++ = (uint8_t)(write_bytes >> 16); *p++ = (uint8_t)(write_bytes >> 8); @@ -132,9 +133,9 @@ void grpc_chttp2_encode_data(uint32_t id, gpr_slice_buffer *inbuf, *p++ = (uint8_t)(id >> 16); *p++ = (uint8_t)(id >> 8); *p++ = (uint8_t)(id); - gpr_slice_buffer_add(outbuf, hdr); + grpc_slice_buffer_add(outbuf, hdr); - gpr_slice_buffer_move_first(inbuf, write_bytes, outbuf); + grpc_slice_buffer_move_first(inbuf, write_bytes, outbuf); stats->framing_bytes += header_size; stats->data_bytes += write_bytes; @@ -143,9 +144,9 @@ void grpc_chttp2_encode_data(uint32_t id, gpr_slice_buffer *inbuf, static grpc_error *parse_inner(grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *p, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - gpr_slice slice) { - uint8_t *const beg = GPR_SLICE_START_PTR(slice); - uint8_t *const end = GPR_SLICE_END_PTR(slice); + grpc_slice slice) { + uint8_t *const beg = GRPC_SLICE_START_PTR(slice); + uint8_t *const end = GRPC_SLICE_END_PTR(slice); uint8_t *cur = beg; uint32_t message_flags; grpc_chttp2_incoming_byte_stream *incoming_byte_stream; @@ -176,7 +177,7 @@ static grpc_error *parse_inner(grpc_exec_ctx *exec_ctx, p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID, (intptr_t)s->id); gpr_free(msg); - msg = gpr_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); + msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES, msg); gpr_free(msg); @@ -236,7 +237,7 @@ static grpc_error *parse_inner(grpc_exec_ctx *exec_ctx, s->stats.incoming.data_bytes += p->frame_size; grpc_chttp2_incoming_byte_stream_push( exec_ctx, p->parsing_frame, - gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); + grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, GRPC_ERROR_NONE); p->parsing_frame = NULL; @@ -246,8 +247,8 @@ static grpc_error *parse_inner(grpc_exec_ctx *exec_ctx, s->stats.incoming.data_bytes += p->frame_size; grpc_chttp2_incoming_byte_stream_push( exec_ctx, p->parsing_frame, - gpr_slice_sub(slice, (size_t)(cur - beg), - (size_t)(cur + p->frame_size - beg))); + grpc_slice_sub(slice, (size_t)(cur - beg), + (size_t)(cur + p->frame_size - beg))); grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, GRPC_ERROR_NONE); p->parsing_frame = NULL; @@ -257,7 +258,7 @@ static grpc_error *parse_inner(grpc_exec_ctx *exec_ctx, GPR_ASSERT(remaining <= p->frame_size); grpc_chttp2_incoming_byte_stream_push( exec_ctx, p->parsing_frame, - gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); + grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); p->frame_size -= remaining; s->stats.incoming.data_bytes += remaining; return GRPC_ERROR_NONE; @@ -270,7 +271,7 @@ static grpc_error *parse_inner(grpc_exec_ctx *exec_ctx, grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - gpr_slice slice, int is_last) { + grpc_slice slice, int is_last) { grpc_chttp2_data_parser *p = parser; grpc_error *error = parse_inner(exec_ctx, p, t, s, slice); diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h index eb2d97d898..264ad14608 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.h +++ b/src/core/ext/transport/chttp2/transport/frame_data.h @@ -36,8 +36,8 @@ /* Parser for GRPC streams embedded in DATA frames */ -#include <grpc/support/slice.h> -#include <grpc/support/slice_buffer.h> +#include <grpc/slice.h> +#include <grpc/slice_buffer.h> #include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/byte_stream.h" @@ -94,11 +94,11 @@ grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser, grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - gpr_slice slice, int is_last); + grpc_slice slice, int is_last); -void grpc_chttp2_encode_data(uint32_t id, gpr_slice_buffer *inbuf, +void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf, uint32_t write_bytes, int is_eof, grpc_transport_one_way_stats *stats, - gpr_slice_buffer *outbuf); + grpc_slice_buffer *outbuf); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */ diff --git a/src/core/ext/transport/chttp2/transport/frame_goaway.c b/src/core/ext/transport/chttp2/transport/frame_goaway.c index 33d2269169..d99d486c1b 100644 --- a/src/core/ext/transport/chttp2/transport/frame_goaway.c +++ b/src/core/ext/transport/chttp2/transport/frame_goaway.c @@ -71,9 +71,9 @@ grpc_error *grpc_chttp2_goaway_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - gpr_slice slice, int is_last) { - uint8_t *const beg = GPR_SLICE_START_PTR(slice); - uint8_t *const end = GPR_SLICE_END_PTR(slice); + grpc_slice slice, int is_last) { + uint8_t *const beg = GRPC_SLICE_START_PTR(slice); + uint8_t *const end = GRPC_SLICE_END_PTR(slice); uint8_t *cur = beg; grpc_chttp2_goaway_parser *p = parser; @@ -151,7 +151,7 @@ grpc_error *grpc_chttp2_goaway_parser_parse(grpc_exec_ctx *exec_ctx, if (is_last) { grpc_chttp2_add_incoming_goaway( exec_ctx, t, (uint32_t)p->error_code, - gpr_slice_new(p->debug_data, p->debug_length, gpr_free)); + grpc_slice_new(p->debug_data, p->debug_length, gpr_free)); p->debug_data = NULL; } return GRPC_ERROR_NONE; @@ -160,13 +160,13 @@ grpc_error *grpc_chttp2_goaway_parser_parse(grpc_exec_ctx *exec_ctx, } void grpc_chttp2_goaway_append(uint32_t last_stream_id, uint32_t error_code, - gpr_slice debug_data, - gpr_slice_buffer *slice_buffer) { - gpr_slice header = gpr_slice_malloc(9 + 4 + 4); - uint8_t *p = GPR_SLICE_START_PTR(header); + grpc_slice debug_data, + grpc_slice_buffer *slice_buffer) { + grpc_slice header = grpc_slice_malloc(9 + 4 + 4); + uint8_t *p = GRPC_SLICE_START_PTR(header); uint32_t frame_length; - GPR_ASSERT(GPR_SLICE_LENGTH(debug_data) < UINT32_MAX - 4 - 4); - frame_length = 4 + 4 + (uint32_t)GPR_SLICE_LENGTH(debug_data); + GPR_ASSERT(GRPC_SLICE_LENGTH(debug_data) < UINT32_MAX - 4 - 4); + frame_length = 4 + 4 + (uint32_t)GRPC_SLICE_LENGTH(debug_data); /* frame header: length */ *p++ = (uint8_t)(frame_length >> 16); @@ -191,7 +191,7 @@ void grpc_chttp2_goaway_append(uint32_t last_stream_id, uint32_t error_code, *p++ = (uint8_t)(error_code >> 16); *p++ = (uint8_t)(error_code >> 8); *p++ = (uint8_t)(error_code); - GPR_ASSERT(p == GPR_SLICE_END_PTR(header)); - gpr_slice_buffer_add(slice_buffer, header); - gpr_slice_buffer_add(slice_buffer, debug_data); + GPR_ASSERT(p == GRPC_SLICE_END_PTR(header)); + grpc_slice_buffer_add(slice_buffer, header); + grpc_slice_buffer_add(slice_buffer, debug_data); } diff --git a/src/core/ext/transport/chttp2/transport/frame_goaway.h b/src/core/ext/transport/chttp2/transport/frame_goaway.h index 355104a5a7..21fe819488 100644 --- a/src/core/ext/transport/chttp2/transport/frame_goaway.h +++ b/src/core/ext/transport/chttp2/transport/frame_goaway.h @@ -34,9 +34,9 @@ #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_GOAWAY_H #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_GOAWAY_H +#include <grpc/slice.h> +#include <grpc/slice_buffer.h> #include <grpc/support/port_platform.h> -#include <grpc/support/slice.h> -#include <grpc/support/slice_buffer.h> #include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -69,10 +69,10 @@ grpc_error *grpc_chttp2_goaway_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - gpr_slice slice, int is_last); + grpc_slice slice, int is_last); void grpc_chttp2_goaway_append(uint32_t last_stream_id, uint32_t error_code, - gpr_slice debug_data, - gpr_slice_buffer *slice_buffer); + grpc_slice debug_data, + grpc_slice_buffer *slice_buffer); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_GOAWAY_H */ diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c index 624f42649d..7de5f6362d 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.c +++ b/src/core/ext/transport/chttp2/transport/frame_ping.c @@ -40,9 +40,9 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> -gpr_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes) { - gpr_slice slice = gpr_slice_malloc(9 + 8); - uint8_t *p = GPR_SLICE_START_PTR(slice); +grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes) { + grpc_slice slice = grpc_slice_malloc(9 + 8); + uint8_t *p = GRPC_SLICE_START_PTR(slice); *p++ = 0; *p++ = 0; @@ -76,9 +76,9 @@ grpc_error *grpc_chttp2_ping_parser_begin_frame(grpc_chttp2_ping_parser *parser, grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - gpr_slice slice, int is_last) { - uint8_t *const beg = GPR_SLICE_START_PTR(slice); - uint8_t *const end = GPR_SLICE_END_PTR(slice); + grpc_slice slice, int is_last) { + uint8_t *const beg = GRPC_SLICE_START_PTR(slice); + uint8_t *const end = GRPC_SLICE_END_PTR(slice); uint8_t *cur = beg; grpc_chttp2_ping_parser *p = parser; @@ -93,8 +93,8 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, if (p->is_ack) { grpc_chttp2_ack_ping(exec_ctx, t, p->opaque_8bytes); } else { - gpr_slice_buffer_add(&t->qbuf, - grpc_chttp2_ping_create(1, p->opaque_8bytes)); + grpc_slice_buffer_add(&t->qbuf, + grpc_chttp2_ping_create(1, p->opaque_8bytes)); grpc_chttp2_initiate_write(exec_ctx, t, false, "ping response"); } } diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.h b/src/core/ext/transport/chttp2/transport/frame_ping.h index 2071f647fb..b9889e2d11 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.h +++ b/src/core/ext/transport/chttp2/transport/frame_ping.h @@ -34,7 +34,7 @@ #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_PING_H #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_PING_H -#include <grpc/support/slice.h> +#include <grpc/slice.h> #include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -44,13 +44,13 @@ typedef struct { uint8_t opaque_8bytes[8]; } grpc_chttp2_ping_parser; -gpr_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes); +grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes); grpc_error *grpc_chttp2_ping_parser_begin_frame(grpc_chttp2_ping_parser *parser, uint32_t length, uint8_t flags); grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - gpr_slice slice, int is_last); + grpc_slice slice, int is_last); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_PING_H */ diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c index 9eac050797..b4c5ed769b 100644 --- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c @@ -42,12 +42,12 @@ #include "src/core/ext/transport/chttp2/transport/http2_errors.h" #include "src/core/ext/transport/chttp2/transport/status_conversion.h" -gpr_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code, - grpc_transport_one_way_stats *stats) { +grpc_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code, + grpc_transport_one_way_stats *stats) { static const size_t frame_size = 13; - gpr_slice slice = gpr_slice_malloc(frame_size); + grpc_slice slice = grpc_slice_malloc(frame_size); stats->framing_bytes += frame_size; - uint8_t *p = GPR_SLICE_START_PTR(slice); + uint8_t *p = GRPC_SLICE_START_PTR(slice); // Frame size. *p++ = 0; @@ -89,9 +89,9 @@ grpc_error *grpc_chttp2_rst_stream_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - gpr_slice slice, int is_last) { - uint8_t *const beg = GPR_SLICE_START_PTR(slice); - uint8_t *const end = GPR_SLICE_END_PTR(slice); + grpc_slice slice, int is_last) { + uint8_t *const beg = GRPC_SLICE_START_PTR(slice); + uint8_t *const end = GRPC_SLICE_END_PTR(slice); uint8_t *cur = beg; grpc_chttp2_rst_stream_parser *p = parser; @@ -117,7 +117,7 @@ grpc_error *grpc_chttp2_rst_stream_parser_parse(grpc_exec_ctx *exec_ctx, char *status_details; gpr_asprintf(&status_details, "Received RST_STREAM with error code %d", reason); - gpr_slice slice_details = gpr_slice_from_copied_string(status_details); + grpc_slice slice_details = grpc_slice_from_copied_string(status_details); gpr_free(status_details); grpc_chttp2_fake_status(exec_ctx, t, s, status_code, &slice_details); } diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.h b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h index 5a1f578a29..779507a617 100644 --- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.h +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h @@ -34,7 +34,7 @@ #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_RST_STREAM_H #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_RST_STREAM_H -#include <grpc/support/slice.h> +#include <grpc/slice.h> #include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/transport.h" @@ -44,8 +44,8 @@ typedef struct { uint8_t reason_bytes[4]; } grpc_chttp2_rst_stream_parser; -gpr_slice grpc_chttp2_rst_stream_create(uint32_t stream_id, uint32_t code, - grpc_transport_one_way_stats *stats); +grpc_slice grpc_chttp2_rst_stream_create(uint32_t stream_id, uint32_t code, + grpc_transport_one_way_stats *stats); grpc_error *grpc_chttp2_rst_stream_parser_begin_frame( grpc_chttp2_rst_stream_parser *parser, uint32_t length, uint8_t flags); @@ -53,6 +53,6 @@ grpc_error *grpc_chttp2_rst_stream_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - gpr_slice slice, int is_last); + grpc_slice slice, int is_last); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_RST_STREAM_H */ diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c index 92022f90c9..98facae87f 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.c +++ b/src/core/ext/transport/chttp2/transport/frame_settings.c @@ -82,19 +82,19 @@ static uint8_t *fill_header(uint8_t *out, uint32_t length, uint8_t flags) { return out; } -gpr_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new, - uint32_t force_mask, size_t count) { +grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new, + uint32_t force_mask, size_t count) { size_t i; uint32_t n = 0; - gpr_slice output; + grpc_slice output; uint8_t *p; for (i = 0; i < count; i++) { n += (new[i] != old[i] || (force_mask & (1u << i)) != 0); } - output = gpr_slice_malloc(9 + 6 * n); - p = fill_header(GPR_SLICE_START_PTR(output), 6 * n, 0); + output = grpc_slice_malloc(9 + 6 * n); + p = fill_header(GRPC_SLICE_START_PTR(output), 6 * n, 0); for (i = 0; i < count; i++) { if (new[i] != old[i] || (force_mask & (1u << i)) != 0) { @@ -109,14 +109,14 @@ gpr_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new, } } - GPR_ASSERT(p == GPR_SLICE_END_PTR(output)); + GPR_ASSERT(p == GRPC_SLICE_END_PTR(output)); return output; } -gpr_slice grpc_chttp2_settings_ack_create(void) { - gpr_slice output = gpr_slice_malloc(9); - fill_header(GPR_SLICE_START_PTR(output), 0, GRPC_CHTTP2_FLAG_ACK); +grpc_slice grpc_chttp2_settings_ack_create(void) { + grpc_slice output = grpc_slice_malloc(9); + fill_header(GRPC_SLICE_START_PTR(output), 0, GRPC_CHTTP2_FLAG_ACK); return output; } @@ -146,10 +146,10 @@ grpc_error *grpc_chttp2_settings_parser_begin_frame( grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - gpr_slice slice, int is_last) { + grpc_slice slice, int is_last) { grpc_chttp2_settings_parser *parser = p; - const uint8_t *cur = GPR_SLICE_START_PTR(slice); - const uint8_t *end = GPR_SLICE_END_PTR(slice); + const uint8_t *cur = GRPC_SLICE_START_PTR(slice); + const uint8_t *end = GRPC_SLICE_END_PTR(slice); char *msg; if (parser->is_ack) { @@ -164,7 +164,7 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p, if (is_last) { memcpy(parser->target_settings, parser->incoming_settings, GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t)); - gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create()); + grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create()); } return GRPC_ERROR_NONE; } @@ -225,7 +225,7 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p, case GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE: grpc_chttp2_goaway_append( t->last_new_stream_id, sp->error_value, - gpr_slice_from_static_string("HTTP2 settings error"), + grpc_slice_from_static_string("HTTP2 settings error"), &t->qbuf); gpr_asprintf(&msg, "invalid value %u passed for %s", parser->value, sp->name); diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.h b/src/core/ext/transport/chttp2/transport/frame_settings.h index 4bfa944cf1..a29dc82106 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.h +++ b/src/core/ext/transport/chttp2/transport/frame_settings.h @@ -34,8 +34,8 @@ #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_SETTINGS_H #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_SETTINGS_H +#include <grpc/slice.h> #include <grpc/support/port_platform.h> -#include <grpc/support/slice.h> #include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -87,10 +87,10 @@ extern const grpc_chttp2_setting_parameters grpc_chttp2_settings_parameters[GRPC_CHTTP2_NUM_SETTINGS]; /* Create a settings frame by diffing old & new, and updating old to be new */ -gpr_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new, - uint32_t force_mask, size_t count); +grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new, + uint32_t force_mask, size_t count); /* Create an ack settings frame */ -gpr_slice grpc_chttp2_settings_ack_create(void); +grpc_slice grpc_chttp2_settings_ack_create(void); grpc_error *grpc_chttp2_settings_parser_begin_frame( grpc_chttp2_settings_parser *parser, uint32_t length, uint8_t flags, @@ -99,6 +99,6 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - gpr_slice slice, int is_last); + grpc_slice slice, int is_last); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_SETTINGS_H */ diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c index 418166a6df..31a31c2871 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.c +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c @@ -38,12 +38,12 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> -gpr_slice grpc_chttp2_window_update_create( +grpc_slice grpc_chttp2_window_update_create( uint32_t id, uint32_t window_update, grpc_transport_one_way_stats *stats) { static const size_t frame_size = 13; - gpr_slice slice = gpr_slice_malloc(frame_size); + grpc_slice slice = grpc_slice_malloc(frame_size); stats->header_bytes += frame_size; - uint8_t *p = GPR_SLICE_START_PTR(slice); + uint8_t *p = GRPC_SLICE_START_PTR(slice); GPR_ASSERT(window_update); @@ -81,9 +81,9 @@ grpc_error *grpc_chttp2_window_update_parser_begin_frame( grpc_error *grpc_chttp2_window_update_parser_parse( grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t, - grpc_chttp2_stream *s, gpr_slice slice, int is_last) { - uint8_t *const beg = GPR_SLICE_START_PTR(slice); - uint8_t *const end = GPR_SLICE_END_PTR(slice); + grpc_chttp2_stream *s, grpc_slice slice, int is_last) { + uint8_t *const beg = GRPC_SLICE_START_PTR(slice); + uint8_t *const end = GRPC_SLICE_END_PTR(slice); uint8_t *cur = beg; grpc_chttp2_window_update_parser *p = parser; diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.h b/src/core/ext/transport/chttp2/transport/frame_window_update.h index 6e62f31872..f75dfb3d87 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.h +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.h @@ -34,7 +34,7 @@ #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_WINDOW_UPDATE_H #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_WINDOW_UPDATE_H -#include <grpc/support/slice.h> +#include <grpc/slice.h> #include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/transport.h" @@ -45,13 +45,13 @@ typedef struct { uint32_t amount; } grpc_chttp2_window_update_parser; -gpr_slice grpc_chttp2_window_update_create(uint32_t id, uint32_t window_delta, - grpc_transport_one_way_stats *stats); +grpc_slice grpc_chttp2_window_update_create( + uint32_t id, uint32_t window_delta, grpc_transport_one_way_stats *stats); grpc_error *grpc_chttp2_window_update_parser_begin_frame( grpc_chttp2_window_update_parser *parser, uint32_t length, uint8_t flags); grpc_error *grpc_chttp2_window_update_parser_parse( grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t, - grpc_chttp2_stream *s, gpr_slice slice, int is_last); + grpc_chttp2_stream *s, grpc_slice slice, int is_last); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_WINDOW_UPDATE_H */ diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c index 581471ba02..eb68fe3138 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c @@ -76,7 +76,7 @@ typedef struct { uint8_t seen_regular_header; /* output stream id */ uint32_t stream_id; - gpr_slice_buffer *output; + grpc_slice_buffer *output; grpc_transport_one_way_stats *stats; /* maximum size of a frame */ size_t max_frame_size; @@ -104,7 +104,7 @@ static void finish_frame(framer_state *st, int is_header_boundary, type = st->is_first_frame ? GRPC_CHTTP2_FRAME_HEADER : GRPC_CHTTP2_FRAME_CONTINUATION; fill_header( - GPR_SLICE_START_PTR(st->output->slices[st->header_idx]), type, + GRPC_SLICE_START_PTR(st->output->slices[st->header_idx]), type, st->stream_id, st->output->length - st->output_length_at_start_of_frame, (uint8_t)((is_last_in_stream ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0) | (is_header_boundary ? GRPC_CHTTP2_DATA_FLAG_END_HEADERS : 0))); @@ -116,7 +116,7 @@ static void finish_frame(framer_state *st, int is_header_boundary, output before beginning */ static void begin_frame(framer_state *st) { st->header_idx = - gpr_slice_buffer_add_indexed(st->output, gpr_slice_malloc(9)); + grpc_slice_buffer_add_indexed(st->output, grpc_slice_malloc(9)); st->output_length_at_start_of_frame = st->output->length; } @@ -147,18 +147,18 @@ static void inc_filter(uint8_t idx, uint32_t *sum, uint8_t *elems) { } } -static void add_header_data(framer_state *st, gpr_slice slice) { - size_t len = GPR_SLICE_LENGTH(slice); +static void add_header_data(framer_state *st, grpc_slice slice) { + size_t len = GRPC_SLICE_LENGTH(slice); size_t remaining; if (len == 0) return; remaining = st->max_frame_size + st->output_length_at_start_of_frame - st->output->length; if (len <= remaining) { st->stats->header_bytes += len; - gpr_slice_buffer_add(st->output, slice); + grpc_slice_buffer_add(st->output, slice); } else { st->stats->header_bytes += remaining; - gpr_slice_buffer_add(st->output, gpr_slice_split_head(&slice, remaining)); + grpc_slice_buffer_add(st->output, grpc_slice_split_head(&slice, remaining)); finish_frame(st, 0, 0); begin_frame(st); add_header_data(st, slice); @@ -167,7 +167,7 @@ static void add_header_data(framer_state *st, gpr_slice slice) { static uint8_t *add_tiny_header_data(framer_state *st, size_t len) { ensure_space(st, len); - return gpr_slice_buffer_tiny_add(st->output, len); + return grpc_slice_buffer_tiny_add(st->output, len); } static void evict_entry(grpc_chttp2_hpack_compressor *c) { @@ -268,9 +268,10 @@ static void emit_indexed(grpc_chttp2_hpack_compressor *c, uint32_t elem_index, len); } -static gpr_slice get_wire_value(grpc_mdelem *elem, uint8_t *huffman_prefix) { - if (grpc_is_binary_header((const char *)GPR_SLICE_START_PTR(elem->key->slice), - GPR_SLICE_LENGTH(elem->key->slice))) { +static grpc_slice get_wire_value(grpc_mdelem *elem, uint8_t *huffman_prefix) { + if (grpc_is_binary_header( + (const char *)GRPC_SLICE_START_PTR(elem->key->slice), + GRPC_SLICE_LENGTH(elem->key->slice))) { *huffman_prefix = 0x80; return grpc_mdstr_as_base64_encoded_and_huffman_compressed(elem->value); } @@ -284,8 +285,8 @@ static void emit_lithdr_incidx(grpc_chttp2_hpack_compressor *c, framer_state *st) { uint32_t len_pfx = GRPC_CHTTP2_VARINT_LENGTH(key_index, 2); uint8_t huffman_prefix; - gpr_slice value_slice = get_wire_value(elem, &huffman_prefix); - size_t len_val = GPR_SLICE_LENGTH(value_slice); + grpc_slice value_slice = get_wire_value(elem, &huffman_prefix); + size_t len_val = GRPC_SLICE_LENGTH(value_slice); uint32_t len_val_len; GPR_ASSERT(len_val <= UINT32_MAX); len_val_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)len_val, 1); @@ -293,7 +294,7 @@ static void emit_lithdr_incidx(grpc_chttp2_hpack_compressor *c, add_tiny_header_data(st, len_pfx), len_pfx); GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, huffman_prefix, add_tiny_header_data(st, len_val_len), len_val_len); - add_header_data(st, gpr_slice_ref(value_slice)); + add_header_data(st, grpc_slice_ref(value_slice)); } static void emit_lithdr_noidx(grpc_chttp2_hpack_compressor *c, @@ -301,8 +302,8 @@ static void emit_lithdr_noidx(grpc_chttp2_hpack_compressor *c, framer_state *st) { uint32_t len_pfx = GRPC_CHTTP2_VARINT_LENGTH(key_index, 4); uint8_t huffman_prefix; - gpr_slice value_slice = get_wire_value(elem, &huffman_prefix); - size_t len_val = GPR_SLICE_LENGTH(value_slice); + grpc_slice value_slice = get_wire_value(elem, &huffman_prefix); + size_t len_val = GRPC_SLICE_LENGTH(value_slice); uint32_t len_val_len; GPR_ASSERT(len_val <= UINT32_MAX); len_val_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)len_val, 1); @@ -310,45 +311,45 @@ static void emit_lithdr_noidx(grpc_chttp2_hpack_compressor *c, add_tiny_header_data(st, len_pfx), len_pfx); GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, huffman_prefix, add_tiny_header_data(st, len_val_len), len_val_len); - add_header_data(st, gpr_slice_ref(value_slice)); + add_header_data(st, grpc_slice_ref(value_slice)); } static void emit_lithdr_incidx_v(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, framer_state *st) { - uint32_t len_key = (uint32_t)GPR_SLICE_LENGTH(elem->key->slice); + uint32_t len_key = (uint32_t)GRPC_SLICE_LENGTH(elem->key->slice); uint8_t huffman_prefix; - gpr_slice value_slice = get_wire_value(elem, &huffman_prefix); - uint32_t len_val = (uint32_t)GPR_SLICE_LENGTH(value_slice); + grpc_slice value_slice = get_wire_value(elem, &huffman_prefix); + uint32_t len_val = (uint32_t)GRPC_SLICE_LENGTH(value_slice); uint32_t len_key_len = GRPC_CHTTP2_VARINT_LENGTH(len_key, 1); uint32_t len_val_len = GRPC_CHTTP2_VARINT_LENGTH(len_val, 1); GPR_ASSERT(len_key <= UINT32_MAX); - GPR_ASSERT(GPR_SLICE_LENGTH(value_slice) <= UINT32_MAX); + GPR_ASSERT(GRPC_SLICE_LENGTH(value_slice) <= UINT32_MAX); *add_tiny_header_data(st, 1) = 0x40; GRPC_CHTTP2_WRITE_VARINT(len_key, 1, 0x00, add_tiny_header_data(st, len_key_len), len_key_len); - add_header_data(st, gpr_slice_ref(elem->key->slice)); + add_header_data(st, grpc_slice_ref(elem->key->slice)); GRPC_CHTTP2_WRITE_VARINT(len_val, 1, huffman_prefix, add_tiny_header_data(st, len_val_len), len_val_len); - add_header_data(st, gpr_slice_ref(value_slice)); + add_header_data(st, grpc_slice_ref(value_slice)); } static void emit_lithdr_noidx_v(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, framer_state *st) { - uint32_t len_key = (uint32_t)GPR_SLICE_LENGTH(elem->key->slice); + uint32_t len_key = (uint32_t)GRPC_SLICE_LENGTH(elem->key->slice); uint8_t huffman_prefix; - gpr_slice value_slice = get_wire_value(elem, &huffman_prefix); - uint32_t len_val = (uint32_t)GPR_SLICE_LENGTH(value_slice); + grpc_slice value_slice = get_wire_value(elem, &huffman_prefix); + uint32_t len_val = (uint32_t)GRPC_SLICE_LENGTH(value_slice); uint32_t len_key_len = GRPC_CHTTP2_VARINT_LENGTH(len_key, 1); uint32_t len_val_len = GRPC_CHTTP2_VARINT_LENGTH(len_val, 1); GPR_ASSERT(len_key <= UINT32_MAX); - GPR_ASSERT(GPR_SLICE_LENGTH(value_slice) <= UINT32_MAX); + GPR_ASSERT(GRPC_SLICE_LENGTH(value_slice) <= UINT32_MAX); *add_tiny_header_data(st, 1) = 0x00; GRPC_CHTTP2_WRITE_VARINT(len_key, 1, 0x00, add_tiny_header_data(st, len_key_len), len_key_len); - add_header_data(st, gpr_slice_ref(elem->key->slice)); + add_header_data(st, grpc_slice_ref(elem->key->slice)); GRPC_CHTTP2_WRITE_VARINT(len_val, 1, huffman_prefix, add_tiny_header_data(st, len_val_len), len_val_len); - add_header_data(st, gpr_slice_ref(value_slice)); + add_header_data(st, grpc_slice_ref(value_slice)); } static void emit_advertise_table_size_change(grpc_chttp2_hpack_compressor *c, @@ -373,8 +374,8 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, uint32_t indices_key; int should_add_elem; - GPR_ASSERT(GPR_SLICE_LENGTH(elem->key->slice) > 0); - if (GPR_SLICE_START_PTR(elem->key->slice)[0] != ':') { /* regular header */ + GPR_ASSERT(GRPC_SLICE_LENGTH(elem->key->slice) > 0); + if (GRPC_SLICE_START_PTR(elem->key->slice)[0] != ':') { /* regular header */ st->seen_regular_header = 1; } else { GPR_ASSERT( @@ -546,7 +547,7 @@ void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, grpc_metadata_batch *metadata, int is_eof, size_t max_frame_size, grpc_transport_one_way_stats *stats, - gpr_slice_buffer *outbuf) { + grpc_slice_buffer *outbuf) { framer_state st; grpc_linked_mdelem *l; gpr_timespec deadline; diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.h b/src/core/ext/transport/chttp2/transport/hpack_encoder.h index 4c3a931549..bcbd675ca2 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.h +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.h @@ -34,9 +34,9 @@ #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_ENCODER_H #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_ENCODER_H +#include <grpc/slice.h> +#include <grpc/slice_buffer.h> #include <grpc/support/port_platform.h> -#include <grpc/support/slice.h> -#include <grpc/support/slice_buffer.h> #include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/metadata_batch.h" @@ -93,6 +93,6 @@ void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, uint32_t id, grpc_metadata_batch *metadata, int is_eof, size_t max_frame_size, grpc_transport_one_way_stats *stats, - gpr_slice_buffer *outbuf); + grpc_slice_buffer *outbuf); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_ENCODER_H */ diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 9702cb2c81..6a9200b8db 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1502,9 +1502,9 @@ static grpc_error *is_binary_indexed_header(grpc_chttp2_hpack_parser *p, GRPC_ERROR_INT_INDEX, (intptr_t)p->index), GRPC_ERROR_INT_SIZE, (intptr_t)p->table.num_ents); } - *is = - grpc_is_binary_header((const char *)GPR_SLICE_START_PTR(elem->key->slice), - GPR_SLICE_LENGTH(elem->key->slice)); + *is = grpc_is_binary_header( + (const char *)GRPC_SLICE_START_PTR(elem->key->slice), + GRPC_SLICE_LENGTH(elem->key->slice)); return GRPC_ERROR_NONE; } @@ -1584,7 +1584,7 @@ static void force_client_rst_stream(grpc_exec_ctx *exec_ctx, void *sp, grpc_chttp2_stream *s = sp; grpc_chttp2_transport *t = s->t; if (!s->write_closed) { - gpr_slice_buffer_add( + grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_CHTTP2_NO_ERROR, &s->stats.outgoing)); grpc_chttp2_initiate_write(exec_ctx, t, false, "force_rst_stream"); @@ -1597,14 +1597,14 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx, void *hpack_parser, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - gpr_slice slice, int is_last) { + grpc_slice slice, int is_last) { grpc_chttp2_hpack_parser *parser = hpack_parser; GPR_TIMER_BEGIN("grpc_chttp2_hpack_parser_parse", 0); if (s != NULL) { - s->stats.incoming.header_bytes += GPR_SLICE_LENGTH(slice); + s->stats.incoming.header_bytes += GRPC_SLICE_LENGTH(slice); } grpc_error *error = grpc_chttp2_hpack_parser_parse( - exec_ctx, parser, GPR_SLICE_START_PTR(slice), GPR_SLICE_END_PTR(slice)); + exec_ctx, parser, GRPC_SLICE_START_PTR(slice), GRPC_SLICE_END_PTR(slice)); if (error != GRPC_ERROR_NONE) { GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0); return error; diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.h b/src/core/ext/transport/chttp2/transport/hpack_parser.h index 0290c78d5a..a39bf466cd 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.h +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.h @@ -116,6 +116,6 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx, void *hpack_parser, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - gpr_slice slice, int is_last); + grpc_slice slice, int is_last); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_PARSER_H */ diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.c b/src/core/ext/transport/chttp2/transport/hpack_table.c index 2b73ec969e..2dc793d304 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_table.c +++ b/src/core/ext/transport/chttp2/transport/hpack_table.c @@ -226,8 +226,8 @@ grpc_mdelem *grpc_chttp2_hptbl_lookup(const grpc_chttp2_hptbl *tbl, /* Evict one element from the table */ static void evict1(grpc_chttp2_hptbl *tbl) { grpc_mdelem *first_ent = tbl->ents[tbl->first_ent]; - size_t elem_bytes = GPR_SLICE_LENGTH(first_ent->key->slice) + - GPR_SLICE_LENGTH(first_ent->value->slice) + + size_t elem_bytes = GRPC_SLICE_LENGTH(first_ent->key->slice) + + GRPC_SLICE_LENGTH(first_ent->value->slice) + GRPC_CHTTP2_HPACK_ENTRY_OVERHEAD; GPR_ASSERT(elem_bytes <= tbl->mem_used); tbl->mem_used -= (uint32_t)elem_bytes; @@ -298,8 +298,8 @@ grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl, grpc_error *grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl, grpc_mdelem *md) { /* determine how many bytes of buffer this entry represents */ - size_t elem_bytes = GPR_SLICE_LENGTH(md->key->slice) + - GPR_SLICE_LENGTH(md->value->slice) + + size_t elem_bytes = GRPC_SLICE_LENGTH(md->key->slice) + + GRPC_SLICE_LENGTH(md->value->slice) + GRPC_CHTTP2_HPACK_ENTRY_OVERHEAD; if (tbl->current_table_bytes > tbl->max_bytes) { diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.h b/src/core/ext/transport/chttp2/transport/hpack_table.h index 45bd9255bf..2ca130e64b 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_table.h +++ b/src/core/ext/transport/chttp2/transport/hpack_table.h @@ -34,8 +34,8 @@ #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_TABLE_H #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_TABLE_H +#include <grpc/slice.h> #include <grpc/support/port_platform.h> -#include <grpc/support/slice.h> #include "src/core/lib/iomgr/error.h" #include "src/core/lib/transport/metadata.h" diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index e0c4a1e925..b74233d992 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -170,14 +170,14 @@ struct grpc_chttp2_incoming_byte_stream { bool is_tail; gpr_mu slice_mu; // protects slices, on_next - gpr_slice_buffer slices; + grpc_slice_buffer slices; grpc_closure *on_next; - gpr_slice *next; + grpc_slice *next; uint32_t remaining_bytes; struct { grpc_closure closure; - gpr_slice *slice; + grpc_slice *slice; size_t max_size_hint; grpc_closure *on_complete; } next_action; @@ -219,7 +219,7 @@ struct grpc_chttp2_transport { grpc_closure read_action_locked; /** incoming read bytes */ - gpr_slice_buffer read_buffer; + grpc_slice_buffer read_buffer; /** address to place a newly accepted stream - set and unset by grpc_chttp2_parsing_accept_stream; used by init_stream to @@ -237,7 +237,7 @@ struct grpc_chttp2_transport { } channel_callback; /** data to write now */ - gpr_slice_buffer outbuf; + grpc_slice_buffer outbuf; /** hpack encoding */ grpc_chttp2_hpack_compressor hpack_compressor; int64_t outgoing_window; @@ -245,7 +245,7 @@ struct grpc_chttp2_transport { uint8_t is_client; /** data to write next write */ - gpr_slice_buffer qbuf; + grpc_slice_buffer qbuf; /** window available to announce to peer */ int64_t announce_incoming_window; @@ -314,12 +314,12 @@ struct grpc_chttp2_transport { grpc_chttp2_stream *incoming_stream; grpc_error *(*parser)(grpc_exec_ctx *exec_ctx, void *parser_user_data, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - gpr_slice slice, int is_last); + grpc_slice slice, int is_last); /* goaway data */ grpc_status_code goaway_error; uint32_t goaway_last_stream_index; - gpr_slice goaway_text; + grpc_slice goaway_text; grpc_chttp2_write_cb *write_cb_pool; @@ -374,7 +374,7 @@ struct grpc_chttp2_stream { grpc_byte_stream *fetching_send_message; uint32_t fetched_send_message_length; - gpr_slice fetching_slice; + grpc_slice fetching_slice; int64_t next_message_end_offset; int64_t flow_controlled_bytes_written; bool complete_fetch_covered_by_poller; @@ -434,7 +434,7 @@ struct grpc_chttp2_stream { bool sent_trailing_metadata; /** how much window should we announce? */ uint32_t announce_window; - gpr_slice_buffer flow_controlled_buffer; + grpc_slice_buffer flow_controlled_buffer; grpc_chttp2_write_cb *on_write_finished_cbs; grpc_chttp2_write_cb *finish_after_write; @@ -466,7 +466,8 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, /** Process one slice of incoming data; return 1 if the connection is still viable after reading, or 0 if the connection should be torn down */ grpc_error *grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, gpr_slice slice); + grpc_chttp2_transport *t, + grpc_slice slice); bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s); @@ -509,7 +510,7 @@ grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx, void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, uint32_t goaway_error, - gpr_slice goaway_text); + grpc_slice goaway_text); void grpc_chttp2_parsing_become_skip_parser(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); @@ -611,7 +612,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *stream, - grpc_status_code status, gpr_slice *details); + grpc_status_code status, grpc_slice *details); void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, int close_reads, @@ -659,7 +660,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( uint32_t frame_size, uint32_t flags); void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - gpr_slice slice); + grpc_slice slice); void grpc_chttp2_incoming_byte_stream_finished( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_error *error); diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 8005350ae7..5efb49751c 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -67,14 +67,14 @@ static grpc_error *init_skip_frame_parser(grpc_exec_ctx *exec_ctx, int is_header); static grpc_error *parse_frame_slice(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, gpr_slice slice, + grpc_chttp2_transport *t, grpc_slice slice, int is_last); grpc_error *grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - gpr_slice slice) { - uint8_t *beg = GPR_SLICE_START_PTR(slice); - uint8_t *end = GPR_SLICE_END_PTR(slice); + grpc_slice slice) { + uint8_t *beg = GRPC_SLICE_START_PTR(slice); + uint8_t *end = GRPC_SLICE_END_PTR(slice); uint8_t *cur = beg; grpc_error *err; @@ -229,10 +229,10 @@ grpc_error *grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx, case GRPC_DTS_FRAME: GPR_ASSERT(cur < end); if ((uint32_t)(end - cur) == t->incoming_frame_size) { - err = parse_frame_slice(exec_ctx, t, - gpr_slice_sub_no_ref(slice, (size_t)(cur - beg), - (size_t)(end - beg)), - 1); + err = parse_frame_slice( + exec_ctx, t, grpc_slice_sub_no_ref(slice, (size_t)(cur - beg), + (size_t)(end - beg)), + 1); if (err != GRPC_ERROR_NONE) { return err; } @@ -243,8 +243,8 @@ grpc_error *grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx, size_t cur_offset = (size_t)(cur - beg); err = parse_frame_slice( exec_ctx, t, - gpr_slice_sub_no_ref(slice, cur_offset, - cur_offset + t->incoming_frame_size), + grpc_slice_sub_no_ref(slice, cur_offset, + cur_offset + t->incoming_frame_size), 1); if (err != GRPC_ERROR_NONE) { return err; @@ -253,10 +253,10 @@ grpc_error *grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx, t->incoming_stream = NULL; goto dts_fh_0; /* loop */ } else { - err = parse_frame_slice(exec_ctx, t, - gpr_slice_sub_no_ref(slice, (size_t)(cur - beg), - (size_t)(end - beg)), - 0); + err = parse_frame_slice( + exec_ctx, t, grpc_slice_sub_no_ref(slice, (size_t)(cur - beg), + (size_t)(end - beg)), + 0); if (err != GRPC_ERROR_NONE) { return err; } @@ -331,7 +331,7 @@ static grpc_error *init_frame_parser(grpc_exec_ctx *exec_ctx, static grpc_error *skip_parser(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - gpr_slice slice, int is_last) { + grpc_slice slice, int is_last) { return GRPC_ERROR_NONE; } @@ -430,7 +430,7 @@ error_handler: if (s != NULL) { grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false, err); } - gpr_slice_buffer_add( + grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(t->incoming_stream_id, GRPC_CHTTP2_PROTOCOL_ERROR, &s->stats.outgoing)); @@ -471,7 +471,8 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp, grpc_mdstr_as_c_string(md->value)); *cached_timeout = gpr_inf_future(GPR_TIMESPAN); } - grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); + cached_timeout = + grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); } grpc_chttp2_incoming_metadata_buffer_set_deadline( &s->metadata_buffer[0], @@ -722,7 +723,7 @@ static grpc_error *init_settings_frame_parser(grpc_exec_ctx *exec_ctx, } static grpc_error *parse_frame_slice(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, gpr_slice slice, + grpc_chttp2_transport *t, grpc_slice slice, int is_last) { grpc_chttp2_stream *s = t->incoming_stream; grpc_error *err = t->parser(exec_ctx, t->parser_data, t, s, slice, is_last); @@ -737,7 +738,7 @@ static grpc_error *parse_frame_slice(grpc_exec_ctx *exec_ctx, grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); if (s) { s->forced_close_error = err; - gpr_slice_buffer_add( + grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(t->incoming_stream_id, GRPC_CHTTP2_PROTOCOL_ERROR, &s->stats.outgoing)); diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index b39695a1a5..139e7387c4 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -80,7 +80,7 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, GPR_TIMER_BEGIN("grpc_chttp2_begin_write", 0); if (t->dirtied_local_settings && !t->sent_local_settings) { - gpr_slice_buffer_add( + grpc_slice_buffer_add( &t->outbuf, grpc_chttp2_settings_create( t->settings[GRPC_SENT_SETTINGS], t->settings[GRPC_LOCAL_SETTINGS], @@ -91,7 +91,7 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, } /* simple writes are queued to qbuf, and flushed here */ - gpr_slice_buffer_move_into(&t->qbuf, &t->outbuf); + grpc_slice_buffer_move_into(&t->qbuf, &t->outbuf); GPR_ASSERT(t->qbuf.count == 0); grpc_chttp2_hpack_compressor_set_max_table_size( @@ -130,9 +130,9 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, /* send any window updates */ if (s->announce_window > 0) { uint32_t announce = s->announce_window; - gpr_slice_buffer_add(&t->outbuf, - grpc_chttp2_window_update_create( - s->id, s->announce_window, &s->stats.outgoing)); + grpc_slice_buffer_add(&t->outbuf, + grpc_chttp2_window_update_create( + s->id, s->announce_window, &s->stats.outgoing)); GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, announce_window, announce); } if (sent_initial_metadata) { @@ -162,9 +162,9 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, s->send_trailing_metadata = NULL; s->sent_trailing_metadata = true; if (!t->is_client && !s->read_closed) { - gpr_slice_buffer_add(&t->outbuf, grpc_chttp2_rst_stream_create( - s->id, GRPC_CHTTP2_NO_ERROR, - &s->stats.outgoing)); + grpc_slice_buffer_add(&t->outbuf, grpc_chttp2_rst_stream_create( + s->id, GRPC_CHTTP2_NO_ERROR, + &s->stats.outgoing)); } } s->sending_bytes += send_bytes; @@ -194,7 +194,7 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, s->send_trailing_metadata = NULL; s->sent_trailing_metadata = true; if (!t->is_client && !s->read_closed) { - gpr_slice_buffer_add( + grpc_slice_buffer_add( &t->outbuf, grpc_chttp2_rst_stream_create( s->id, GRPC_CHTTP2_NO_ERROR, &s->stats.outgoing)); } @@ -220,8 +220,8 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, announce_incoming_window, announced); grpc_transport_one_way_stats throwaway_stats; - gpr_slice_buffer_add(&t->outbuf, grpc_chttp2_window_update_create( - 0, announced, &throwaway_stats)); + grpc_slice_buffer_add(&t->outbuf, grpc_chttp2_window_update_create( + 0, announced, &throwaway_stats)); } GPR_TIMER_END("grpc_chttp2_begin_write", 0); @@ -254,7 +254,7 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:end"); } - gpr_slice_buffer_reset_and_unref(&t->outbuf); + grpc_slice_buffer_reset_and_unref(&t->outbuf); GRPC_ERROR_UNREF(error); GPR_TIMER_END("grpc_chttp2_end_write", 0); } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 25ad40b935..a4c110101e 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -34,14 +34,15 @@ #include <string.h> #include <grpc/impl/codegen/port_platform.h> +#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> -#include <grpc/support/slice_buffer.h> #include <grpc/support/string_util.h> #include <grpc/support/useful.h> #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" +#include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/channel.h" @@ -130,7 +131,7 @@ struct read_state { /* vars for holding data destined for the application */ struct grpc_slice_buffer_stream sbs; - gpr_slice_buffer read_slice_buffer; + grpc_slice_buffer read_slice_buffer; /* vars for trailing metadata */ grpc_chttp2_incoming_metadata_buffer trailing_metadata; @@ -517,11 +518,11 @@ static void on_response_trailers_received( Utility function that takes the data from s->write_slice_buffer and assembles into a contiguous byte stream with 5 byte gRPC header prepended. */ -static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer, +static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer, char **pp_write_buffer, size_t *p_write_buffer_size) { - gpr_slice slice = gpr_slice_buffer_take_first(write_slice_buffer); - size_t length = GPR_SLICE_LENGTH(slice); + grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer); + size_t length = GRPC_SLICE_LENGTH(slice); *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES; /* This is freed in the on_write_completed callback */ char *write_buffer = gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES); @@ -534,7 +535,7 @@ static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer, *p++ = (uint8_t)(length >> 8); *p++ = (uint8_t)(length); /* append actual data */ - memcpy(p, GPR_SLICE_START_PTR(slice), length); + memcpy(p, GRPC_SLICE_START_PTR(slice), length); } /* @@ -610,6 +611,16 @@ static int parse_grpc_header(const uint8_t *data) { return length; } +static bool header_has_authority(grpc_linked_mdelem *head) { + while (head != NULL) { + if (head->md->key == GRPC_MDSTR_AUTHORITY) { + return true; + } + head = head->next; + } + return false; +} + /* Op Execution: Decide if one of the actions contained in the stream op can be executed. This is the heart of the state machine. @@ -817,9 +828,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, result = NO_ACTION_POSSIBLE; CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); } else { - gpr_slice_buffer write_slice_buffer; - gpr_slice slice; - gpr_slice_buffer_init(&write_slice_buffer); + grpc_slice_buffer write_slice_buffer; + grpc_slice slice; + grpc_slice_buffer_init(&write_slice_buffer); grpc_byte_stream_next(NULL, stream_op->send_message, &slice, stream_op->send_message->length, NULL); /* Check that compression flag is OFF. We don't support compression yet. @@ -828,7 +839,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, gpr_log(GPR_ERROR, "Compression is not supported"); GPR_ASSERT(stream_op->send_message->flags == 0); } - gpr_slice_buffer_add(&write_slice_buffer, slice); + grpc_slice_buffer_add(&write_slice_buffer, slice); if (write_slice_buffer.count != 1) { /* Empty request not handled yet */ gpr_log(GPR_ERROR, "Empty request is not supported"); @@ -891,7 +902,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } else { stream_state->rs.remaining_bytes = 0; CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response."); - gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer); + grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); grpc_slice_buffer_stream_init(&stream_state->rs.sbs, &stream_state->rs.read_slice_buffer, 0); *((grpc_byte_buffer **)stream_op->recv_message) = @@ -918,15 +929,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } } else if (stream_state->rs.remaining_bytes == 0) { CRONET_LOG(GPR_DEBUG, "read operation complete"); - gpr_slice read_data_slice = - gpr_slice_malloc((uint32_t)stream_state->rs.length_field); - uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice); + grpc_slice read_data_slice = + grpc_slice_malloc((uint32_t)stream_state->rs.length_field); + uint8_t *dst_p = GRPC_SLICE_START_PTR(read_data_slice); memcpy(dst_p, stream_state->rs.read_buffer, (size_t)stream_state->rs.length_field); free_read_buffer(s); - gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer); - gpr_slice_buffer_add(&stream_state->rs.read_slice_buffer, - read_data_slice); + grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); + grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer, + read_data_slice); grpc_slice_buffer_stream_init(&stream_state->rs.sbs, &stream_state->rs.read_slice_buffer, 0); *((grpc_byte_buffer **)stream_op->recv_message) = @@ -981,11 +992,18 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } else if (stream_op->on_complete && op_can_be_run(stream_op, stream_state, &oas->state, OP_ON_COMPLETE)) { - /* All actions in this stream_op are complete. Call the on_complete callback - */ CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas); - grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE, - NULL); + if (stream_state->state_op_done[OP_CANCEL_ERROR] || + stream_state->state_callback_received[OP_FAILED]) { + grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, + GRPC_ERROR_CANCELLED, NULL); + } else { + /* All actions in this stream_op are complete. Call the on_complete + * callback + */ + grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE, + NULL); + } oas->state.state_op_done[OP_ON_COMPLETE] = true; oas->done = true; /* reset any send message state, only if this ON_COMPLETE is about a send. @@ -1042,7 +1060,31 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->curr_gs = gs; memcpy(&s->curr_ct, gt, sizeof(grpc_cronet_transport)); add_to_storage(s, op); - execute_from_storage(s); + if (op->send_initial_metadata && + header_has_authority(op->send_initial_metadata->list.head)) { + /* Cronet does not support :authority header field. We cancel the call when + this field is present in metadata */ + cronet_bidirectional_stream_header_array header_array; + cronet_bidirectional_stream_header *header; + cronet_bidirectional_stream cbs; + CRONET_LOG(GPR_DEBUG, + ":authority header is provided but not supported;" + " cancel operations"); + /* Notify application that operation is cancelled by forging trailers */ + header_array.count = 1; + header_array.capacity = 1; + header_array.headers = + gpr_malloc(sizeof(cronet_bidirectional_stream_header)); + header = (cronet_bidirectional_stream_header *)header_array.headers; + header->key = "grpc-status"; + header->value = "1"; /* Return status GRPC_STATUS_CANCELLED */ + cbs.annotation = (void *)s; + s->state.state_op_done[OP_CANCEL_ERROR] = true; + on_response_trailers_received(&cbs, &header_array); + gpr_free(header_array.headers); + } else { + execute_from_storage(s); + } } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, @@ -1054,6 +1096,11 @@ static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { return NULL; } +static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, + grpc_transport *gt) { + return NULL; +} + static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_transport_op *op) {} @@ -1066,4 +1113,5 @@ const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj), perform_op, destroy_stream, destroy_transport, - get_peer}; + get_peer, + get_endpoint}; |