diff options
author | Craig Tiller <ctiller@google.com> | 2017-03-29 09:24:21 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-03-29 09:24:21 -0700 |
commit | 21947d33a08a5e99c96662cae3cd758ed8b93de5 (patch) | |
tree | ebd7fab829f4cb1d63747b6e726e6d42df030727 /src/core/lib | |
parent | b2d8e03c5e1928464efc8251f9406cf8506f8973 (diff) | |
parent | 0c10db72c4427762000d36211b6740f40f4b8627 (diff) |
Merge github.com:grpc/grpc into bugscrub1-proposedfix1
Diffstat (limited to 'src/core/lib')
70 files changed, 1781 insertions, 847 deletions
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index 42ef7b7806..75c68a5534 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -90,7 +90,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), &args->call_stack->refcount, args->server_transport_data, args->arena); return r == 0 ? GRPC_ERROR_NONE - : GRPC_ERROR_CREATE("transport stream initialization failed"); + : GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "transport stream initialization failed"); } static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 34114bbebf..ca701ed457 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -55,9 +55,9 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, if (error != GRPC_ERROR_CANCELLED) { grpc_call_element_signal_error( exec_ctx, elem, - grpc_error_set_int(GRPC_ERROR_CREATE("Deadline Exceeded"), - GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_DEADLINE_EXCEEDED)); + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED)); } GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer"); } diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index 1b4240bb10..5861fa6f54 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -236,8 +236,9 @@ static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void* arg, static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_handshake_manager* mgr = arg; if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled. - grpc_handshake_manager_shutdown(exec_ctx, mgr, - GRPC_ERROR_CREATE("Handshake timed out")); + grpc_handshake_manager_shutdown( + exec_ctx, mgr, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake timed out")); } grpc_handshake_manager_unref(exec_ctx, mgr); } diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index f9d0d689ac..967904df1e 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -36,6 +36,7 @@ #include <grpc/support/string_util.h> #include <string.h> #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/security/util/b64.h" #include "src/core/lib/slice/percent_encoding.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -56,7 +57,6 @@ typedef struct call_data { grpc_linked_mdelem te_trailers; grpc_linked_mdelem content_type; grpc_linked_mdelem user_agent; - grpc_linked_mdelem payload_bin; grpc_metadata_batch *recv_initial_metadata; grpc_metadata_batch *recv_trailing_metadata; @@ -108,11 +108,11 @@ static grpc_error *client_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, grpc_error *e = grpc_error_set_str( grpc_error_set_int( grpc_error_set_str( - GRPC_ERROR_CREATE( + GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Received http2 :status header with non-200 OK status"), - GRPC_ERROR_STR_VALUE, val), + GRPC_ERROR_STR_VALUE, grpc_slice_from_copied_string(val)), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED), - GRPC_ERROR_STR_GRPC_MESSAGE, msg); + GRPC_ERROR_STR_GRPC_MESSAGE, grpc_slice_from_copied_string(msg)); gpr_free(val); gpr_free(msg); return e; @@ -292,19 +292,58 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, continue_send_message(exec_ctx, elem); if (calld->send_message_blocked == false) { - /* when all the send_message data is available, then create a MDELEM and - append to headers */ - grpc_mdelem payload_bin = grpc_mdelem_from_slices( - exec_ctx, GRPC_MDSTR_GRPC_PAYLOAD_BIN, - grpc_slice_from_copied_buffer((const char *)calld->payload_bytes, - op->send_message->length)); - error = - grpc_metadata_batch_add_tail(exec_ctx, op->send_initial_metadata, - &calld->payload_bin, payload_bin); + /* when all the send_message data is available, then modify the path + * MDELEM by appending base64 encoded query to the path */ + const int k_url_safe = 1; + const int k_multi_line = 0; + const unsigned char k_query_separator = '?'; + + grpc_slice path_slice = + GRPC_MDVALUE(op->send_initial_metadata->idx.named.path->md); + /* sum up individual component's lengths and allocate enough memory to + * hold combined path+query */ + size_t estimated_len = GRPC_SLICE_LENGTH(path_slice); + estimated_len++; /* for the '?' */ + estimated_len += grpc_base64_estimate_encoded_size( + op->send_message->length, k_url_safe, k_multi_line); + estimated_len += 1; /* for the trailing 0 */ + grpc_slice path_with_query_slice = grpc_slice_malloc(estimated_len); + + /* memcopy individual pieces into this slice */ + uint8_t *write_ptr = + (uint8_t *)GRPC_SLICE_START_PTR(path_with_query_slice); + uint8_t *original_path = (uint8_t *)GRPC_SLICE_START_PTR(path_slice); + memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice)); + write_ptr += GRPC_SLICE_LENGTH(path_slice); + + *write_ptr = k_query_separator; + write_ptr++; /* for the '?' */ + + grpc_base64_encode_core((char *)write_ptr, calld->payload_bytes, + op->send_message->length, k_url_safe, + k_multi_line); + + /* remove trailing unused memory and add trailing 0 to terminate string + */ + char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice); + /* safe to use strlen since base64_encode will always add '\0' */ + size_t path_length = strlen(t) + 1; + *(t + path_length) = '\0'; + path_with_query_slice = + grpc_slice_sub(path_with_query_slice, 0, path_length); + + /* substitute previous path with the new path+query */ + grpc_mdelem mdelem_path_and_query = grpc_mdelem_from_slices( + exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice); + grpc_metadata_batch *b = op->send_initial_metadata; + error = grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path, + mdelem_path_and_query); if (error != GRPC_ERROR_NONE) return error; + calld->on_complete = op->on_complete; op->on_complete = &calld->hc_on_complete; op->send_message = NULL; + grpc_slice_unref_internal(exec_ctx, path_with_query_slice); } else { /* Not all data is available. Fall back to POST. */ gpr_log(GPR_DEBUG, diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index bebd3af335..8d3c488ea0 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -37,6 +37,7 @@ #include <grpc/support/log.h> #include <string.h> #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/security/util/b64.h" #include "src/core/lib/slice/percent_encoding.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -51,8 +52,8 @@ typedef struct call_data { grpc_linked_mdelem status; grpc_linked_mdelem content_type; - /* did this request come with payload-bin */ - bool seen_payload_bin; + /* did this request come with path query containing request payload */ + bool seen_path_with_query; /* flag to ensure payload_bin is delivered only once */ bool payload_bin_delivered; @@ -61,7 +62,7 @@ typedef struct call_data { bool *recv_cacheable_request; /** Closure to call when finished with the hs_on_recv hook */ grpc_closure *on_done_recv; - /** Closure to call when we retrieve read message from the payload-bin header + /** Closure to call when we retrieve read message from the path URI */ grpc_closure *recv_message_ready; grpc_closure *on_complete; @@ -101,7 +102,7 @@ static void add_error(const char *error_name, grpc_error **cumulative, grpc_error *new) { if (new == GRPC_ERROR_NONE) return; if (*cumulative == GRPC_ERROR_NONE) { - *cumulative = GRPC_ERROR_CREATE(error_name); + *cumulative = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_name); } *cumulative = grpc_error_add_child(*cumulative, new); } @@ -125,27 +126,32 @@ static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, *calld->recv_cacheable_request = true; } else { add_error(error_name, &error, - grpc_attach_md_to_error(GRPC_ERROR_CREATE("Bad header"), - b->idx.named.method->md)); + grpc_attach_md_to_error( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"), + b->idx.named.method->md)); } grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.method); } else { - add_error(error_name, &error, - grpc_error_set_str(GRPC_ERROR_CREATE("Missing header"), - GRPC_ERROR_STR_KEY, ":method")); + add_error( + error_name, &error, + grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), + GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":method"))); } if (b->idx.named.te != NULL) { if (!grpc_mdelem_eq(b->idx.named.te->md, GRPC_MDELEM_TE_TRAILERS)) { add_error(error_name, &error, - grpc_attach_md_to_error(GRPC_ERROR_CREATE("Bad header"), - b->idx.named.te->md)); + grpc_attach_md_to_error( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"), + b->idx.named.te->md)); } grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.te); } else { add_error(error_name, &error, - grpc_error_set_str(GRPC_ERROR_CREATE("Missing header"), - GRPC_ERROR_STR_KEY, "te")); + grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), + GRPC_ERROR_STR_KEY, grpc_slice_from_static_string("te"))); } if (b->idx.named.scheme != NULL) { @@ -153,14 +159,17 @@ static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, !grpc_mdelem_eq(b->idx.named.scheme->md, GRPC_MDELEM_SCHEME_HTTPS) && !grpc_mdelem_eq(b->idx.named.scheme->md, GRPC_MDELEM_SCHEME_GRPC)) { add_error(error_name, &error, - grpc_attach_md_to_error(GRPC_ERROR_CREATE("Bad header"), - b->idx.named.scheme->md)); + grpc_attach_md_to_error( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"), + b->idx.named.scheme->md)); } grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.scheme); } else { - add_error(error_name, &error, - grpc_error_set_str(GRPC_ERROR_CREATE("Missing header"), - GRPC_ERROR_STR_KEY, ":scheme")); + add_error( + error_name, &error, + grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), + GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":scheme"))); } if (b->idx.named.content_type != NULL) { @@ -194,8 +203,46 @@ static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, if (b->idx.named.path == NULL) { add_error(error_name, &error, - grpc_error_set_str(GRPC_ERROR_CREATE("Missing header"), - GRPC_ERROR_STR_KEY, ":path")); + grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), + GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":path"))); + } else if (*calld->recv_cacheable_request == true) { + /* We have a cacheable request made with GET verb. The path contains the + * query parameter which is base64 encoded request payload. */ + const char k_query_separator = '?'; + grpc_slice path_slice = GRPC_MDVALUE(b->idx.named.path->md); + uint8_t *path_ptr = (uint8_t *)GRPC_SLICE_START_PTR(path_slice); + size_t path_length = GRPC_SLICE_LENGTH(path_slice); + /* offset of the character '?' */ + size_t offset = 0; + for (offset = 0; *path_ptr != k_query_separator && offset < path_length; + path_ptr++, offset++) + ; + if (offset < path_length) { + grpc_slice query_slice = + grpc_slice_sub(path_slice, offset + 1, path_length); + + /* substitute path metadata with just the path (not query) */ + grpc_mdelem mdelem_path_without_query = grpc_mdelem_from_slices( + exec_ctx, GRPC_MDSTR_PATH, grpc_slice_sub(path_slice, 0, offset)); + + grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path, + mdelem_path_without_query); + + /* decode payload from query and add to the slice buffer to be returned */ + const int k_url_safe = 1; + grpc_slice_buffer_add( + &calld->read_slice_buffer, + grpc_base64_decode(exec_ctx, + (const char *)GRPC_SLICE_START_PTR(query_slice), + k_url_safe)); + grpc_slice_buffer_stream_init(&calld->read_stream, + &calld->read_slice_buffer, 0); + calld->seen_path_with_query = true; + grpc_slice_unref_internal(exec_ctx, query_slice); + } else { + gpr_log(GPR_ERROR, "GET request without QUERY"); + } } if (b->idx.named.host != NULL && b->idx.named.authority == NULL) { @@ -212,19 +259,11 @@ static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, } if (b->idx.named.authority == NULL) { - add_error(error_name, &error, - grpc_error_set_str(GRPC_ERROR_CREATE("Missing header"), - GRPC_ERROR_STR_KEY, ":authority")); - } - - if (b->idx.named.grpc_payload_bin != NULL) { - calld->seen_payload_bin = true; - grpc_slice_buffer_add(&calld->read_slice_buffer, - grpc_slice_ref_internal( - GRPC_MDVALUE(b->idx.named.grpc_payload_bin->md))); - grpc_slice_buffer_stream_init(&calld->read_stream, - &calld->read_slice_buffer, 0); - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_payload_bin); + add_error( + error_name, &error, + grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), + GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":authority"))); } return error; @@ -247,8 +286,8 @@ static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, grpc_error *err) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; - /* Call recv_message_ready if we got the payload via the header field */ - if (calld->seen_payload_bin && calld->recv_message_ready != NULL) { + /* Call recv_message_ready if we got the payload via the path field */ + if (calld->seen_path_with_query && calld->recv_message_ready != NULL) { *calld->pp_recv_message = calld->payload_bin_delivered ? NULL : (grpc_byte_stream *)&calld->read_stream; @@ -263,7 +302,7 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data, grpc_error *err) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; - if (calld->seen_payload_bin) { + if (calld->seen_path_with_query) { /* do nothing. This is probably a GET request, and payload will be returned in hs_on_complete callback. */ } else { diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index 5ba13fe251..63136650a5 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -121,8 +121,8 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, "Received message larger than max (%u vs. %d)", (*calld->recv_message)->length, calld->max_recv_size); grpc_error* new_error = grpc_error_set_int( - GRPC_ERROR_CREATE(message_string), GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_INVALID_ARGUMENT); + GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INVALID_ARGUMENT); if (error == GRPC_ERROR_NONE) { error = new_error; } else { @@ -147,9 +147,10 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)", op->send_message->length, calld->max_send_size); grpc_transport_stream_op_finish_with_failure( - exec_ctx, op, grpc_error_set_int(GRPC_ERROR_CREATE(message_string), - GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_INVALID_ARGUMENT)); + exec_ctx, op, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_INVALID_ARGUMENT)); gpr_free(message_string); return; } diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 6d7aa43b81..453a64b049 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -126,13 +126,15 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, static void append_error(internal_request *req, grpc_error *error) { if (req->overall_error == GRPC_ERROR_NONE) { - req->overall_error = GRPC_ERROR_CREATE("Failed HTTP/1 client request"); + req->overall_error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed HTTP/1 client request"); } grpc_resolved_address *addr = &req->addresses->addrs[req->next_address - 1]; char *addr_text = grpc_sockaddr_to_uri(addr); req->overall_error = grpc_error_add_child( req->overall_error, - grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, addr_text)); + grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, + grpc_slice_from_copied_string(addr_text))); gpr_free(addr_text); } @@ -190,8 +192,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, internal_request *req = arg; if (!ep) { - next_address(exec_ctx, req, - GRPC_ERROR_CREATE("Unexplained handshake failure")); + next_address(exec_ctx, req, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Unexplained handshake failure")); return; } @@ -221,8 +223,8 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req, } if (req->next_address == req->addresses->naddrs) { finish(exec_ctx, req, - GRPC_ERROR_CREATE_REFERENCING("Failed HTTP requests to all targets", - &req->overall_error, 1)); + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Failed HTTP requests to all targets", &req->overall_error, 1)); return; } addr = &req->addresses->addrs[req->next_address++]; diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c index 354d2f4a09..be6a6d618a 100644 --- a/src/core/lib/http/httpcli_security_connector.c +++ b/src/core/lib/http/httpcli_security_connector.c @@ -95,7 +95,7 @@ static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx, char *msg; gpr_asprintf(&msg, "Peer name %s is not in peer certificate", c->secure_peer_name); - error = GRPC_ERROR_CREATE(msg); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); } grpc_closure_sched(exec_ctx, on_peer_checked, error); diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c index b9c56c103c..aac506b800 100644 --- a/src/core/lib/http/parser.c +++ b/src/core/lib/http/parser.c @@ -54,26 +54,36 @@ static grpc_error *handle_response_line(grpc_http_parser *parser) { uint8_t *cur = beg; uint8_t *end = beg + parser->cur_line_length; - if (cur == end || *cur++ != 'H') return GRPC_ERROR_CREATE("Expected 'H'"); - if (cur == end || *cur++ != 'T') return GRPC_ERROR_CREATE("Expected 'T'"); - if (cur == end || *cur++ != 'T') return GRPC_ERROR_CREATE("Expected 'T'"); - if (cur == end || *cur++ != 'P') return GRPC_ERROR_CREATE("Expected 'P'"); - if (cur == end || *cur++ != '/') return GRPC_ERROR_CREATE("Expected '/'"); - if (cur == end || *cur++ != '1') return GRPC_ERROR_CREATE("Expected '1'"); - if (cur == end || *cur++ != '.') return GRPC_ERROR_CREATE("Expected '.'"); + if (cur == end || *cur++ != 'H') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected 'H'"); + if (cur == end || *cur++ != 'T') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected 'T'"); + if (cur == end || *cur++ != 'T') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected 'T'"); + if (cur == end || *cur++ != 'P') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected 'P'"); + if (cur == end || *cur++ != '/') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected '/'"); + if (cur == end || *cur++ != '1') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected '1'"); + if (cur == end || *cur++ != '.') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected '.'"); if (cur == end || *cur < '0' || *cur++ > '1') { - return GRPC_ERROR_CREATE("Expected HTTP/1.0 or HTTP/1.1"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Expected HTTP/1.0 or HTTP/1.1"); } - if (cur == end || *cur++ != ' ') return GRPC_ERROR_CREATE("Expected ' '"); + if (cur == end || *cur++ != ' ') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected ' '"); if (cur == end || *cur < '1' || *cur++ > '9') - return GRPC_ERROR_CREATE("Expected status code"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected status code"); if (cur == end || *cur < '0' || *cur++ > '9') - return GRPC_ERROR_CREATE("Expected status code"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected status code"); if (cur == end || *cur < '0' || *cur++ > '9') - return GRPC_ERROR_CREATE("Expected status code"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected status code"); parser->http.response->status = (cur[-3] - '0') * 100 + (cur[-2] - '0') * 10 + (cur[-1] - '0'); - if (cur == end || *cur++ != ' ') return GRPC_ERROR_CREATE("Expected ' '"); + if (cur == end || *cur++ != ' ') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected ' '"); /* we don't really care about the status code message */ @@ -89,24 +99,33 @@ static grpc_error *handle_request_line(grpc_http_parser *parser) { while (cur != end && *cur++ != ' ') ; - if (cur == end) return GRPC_ERROR_CREATE("No method on HTTP request line"); + if (cur == end) + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "No method on HTTP request line"); parser->http.request->method = buf2str(beg, (size_t)(cur - beg - 1)); beg = cur; while (cur != end && *cur++ != ' ') ; - if (cur == end) return GRPC_ERROR_CREATE("No path on HTTP request line"); + if (cur == end) + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No path on HTTP request line"); parser->http.request->path = buf2str(beg, (size_t)(cur - beg - 1)); - if (cur == end || *cur++ != 'H') return GRPC_ERROR_CREATE("Expected 'H'"); - if (cur == end || *cur++ != 'T') return GRPC_ERROR_CREATE("Expected 'T'"); - if (cur == end || *cur++ != 'T') return GRPC_ERROR_CREATE("Expected 'T'"); - if (cur == end || *cur++ != 'P') return GRPC_ERROR_CREATE("Expected 'P'"); - if (cur == end || *cur++ != '/') return GRPC_ERROR_CREATE("Expected '/'"); + if (cur == end || *cur++ != 'H') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected 'H'"); + if (cur == end || *cur++ != 'T') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected 'T'"); + if (cur == end || *cur++ != 'T') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected 'T'"); + if (cur == end || *cur++ != 'P') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected 'P'"); + if (cur == end || *cur++ != '/') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected '/'"); vers_major = (uint8_t)(*cur++ - '1' + 1); ++cur; if (cur == end) - return GRPC_ERROR_CREATE("End of line in HTTP version string"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "End of line in HTTP version string"); vers_minor = (uint8_t)(*cur++ - '1' + 1); if (vers_major == 1) { @@ -115,18 +134,19 @@ static grpc_error *handle_request_line(grpc_http_parser *parser) { } else if (vers_minor == 1) { parser->http.request->version = GRPC_HTTP_HTTP11; } else { - return GRPC_ERROR_CREATE( + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Expected one of HTTP/1.0, HTTP/1.1, or HTTP/2.0"); } } else if (vers_major == 2) { if (vers_minor == 0) { parser->http.request->version = GRPC_HTTP_HTTP20; } else { - return GRPC_ERROR_CREATE( + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Expected one of HTTP/1.0, HTTP/1.1, or HTTP/2.0"); } } else { - return GRPC_ERROR_CREATE("Expected one of HTTP/1.0, HTTP/1.1, or HTTP/2.0"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Expected one of HTTP/1.0, HTTP/1.1, or HTTP/2.0"); } return GRPC_ERROR_NONE; @@ -139,7 +159,8 @@ static grpc_error *handle_first_line(grpc_http_parser *parser) { case GRPC_HTTP_RESPONSE: return handle_response_line(parser); } - GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE("Should never reach here")); + GPR_UNREACHABLE_CODE( + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Should never reach here")); } static grpc_error *add_header(grpc_http_parser *parser) { @@ -154,7 +175,8 @@ static grpc_error *add_header(grpc_http_parser *parser) { GPR_ASSERT(cur != end); if (*cur == ' ' || *cur == '\t') { - error = GRPC_ERROR_CREATE("Continued header lines not supported yet"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Continued header lines not supported yet"); goto done; } @@ -162,7 +184,8 @@ static grpc_error *add_header(grpc_http_parser *parser) { cur++; } if (cur == end) { - error = GRPC_ERROR_CREATE("Didn't find ':' in header string"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Didn't find ':' in header string"); goto done; } GPR_ASSERT(cur >= beg); @@ -222,7 +245,8 @@ static grpc_error *finish_line(grpc_http_parser *parser, } break; case GRPC_HTTP_BODY: - GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE("Should never reach here")); + GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Should never reach here")); } parser->cur_line_length = 0; @@ -240,7 +264,8 @@ static grpc_error *addbyte_body(grpc_http_parser *parser, uint8_t byte) { body_length = &parser->http.request->body_length; body = &parser->http.request->body; } else { - GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE("Should never reach here")); + GPR_UNREACHABLE_CODE( + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Should never reach here")); } if (*body_length == parser->body_capacity) { @@ -286,7 +311,8 @@ static grpc_error *addbyte(grpc_http_parser *parser, uint8_t byte, if (grpc_http1_trace) gpr_log(GPR_ERROR, "HTTP header max line length (%d) exceeded", GRPC_HTTP_PARSER_MAX_HEADER_LENGTH); - return GRPC_ERROR_CREATE("HTTP header max line length exceeded"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "HTTP header max line length exceeded"); } parser->cur_line[parser->cur_line_length] = byte; parser->cur_line_length++; @@ -347,7 +373,7 @@ grpc_error *grpc_http_parser_parse(grpc_http_parser *parser, grpc_slice slice, grpc_error *grpc_http_parser_eof(grpc_http_parser *parser) { if (parser->state != GRPC_HTTP_BODY) { - return GRPC_ERROR_CREATE("Did not finish headers"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Did not finish headers"); } return GRPC_ERROR_NONE; } diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index 509c1ff95d..6633fb68ec 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -33,6 +33,7 @@ #include "src/core/lib/iomgr/closure.h" +#include <assert.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -124,6 +125,7 @@ void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *c, grpc_error *error) { GPR_TIMER_BEGIN("grpc_closure_run", 0); if (c != NULL) { + assert(c->cb); c->scheduler->vtable->run(exec_ctx, c, error); } else { GRPC_ERROR_UNREF(error); @@ -135,6 +137,7 @@ void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *c, grpc_error *error) { GPR_TIMER_BEGIN("grpc_closure_sched", 0); if (c != NULL) { + assert(c->cb); c->scheduler->vtable->sched(exec_ctx, c, error); } else { GRPC_ERROR_UNREF(error); @@ -146,6 +149,7 @@ void grpc_closure_list_sched(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) { grpc_closure *c = list->head; while (c != NULL) { grpc_closure *next = c->next_data.next; + assert(c->cb); c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error); c = next; } diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index fa9966c3a6..2bc476bbef 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -33,6 +33,7 @@ #include "src/core/lib/iomgr/combiner.h" +#include <assert.h> #include <string.h> #include <grpc/support/alloc.h> @@ -216,6 +217,7 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d last=%" PRIdPTR, lock, cl, covered_by_poller, last)); GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed + assert(cl->cb); cl->error_data.scratch = pack_error_data((error_data){error, covered_by_poller}); if (covered_by_poller) { diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index 1127fff756..1dbb64e8f3 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -35,7 +35,6 @@ #include <string.h> -#include <grpc/slice.h> #include <grpc/status.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -287,7 +286,7 @@ static void internal_add_error(grpc_error **err, grpc_error *new) { // It is very common to include and extra int and string in an error #define SURPLUS_CAPACITY (2 * SLOTS_PER_INT + SLOTS_PER_TIME) -grpc_error *grpc_error_create(const char *file, int line, const char *desc, +grpc_error *grpc_error_create(grpc_slice file, int line, grpc_slice desc, grpc_error **referencing, size_t num_referencing) { GPR_TIMER_BEGIN("grpc_error_create", 0); @@ -313,14 +312,8 @@ grpc_error *grpc_error_create(const char *file, int line, const char *desc, memset(err->times, UINT8_MAX, GRPC_ERROR_TIME_MAX); internal_set_int(&err, GRPC_ERROR_INT_FILE_LINE, line); - internal_set_str(&err, GRPC_ERROR_STR_FILE, - grpc_slice_from_static_string(file)); - internal_set_str( - &err, GRPC_ERROR_STR_DESCRIPTION, - grpc_slice_from_copied_buffer( - desc, - strlen(desc) + - 1)); // TODO, pull this up. // TODO(ncteisen), pull this up. + internal_set_str(&err, GRPC_ERROR_STR_FILE, file); + internal_set_str(&err, GRPC_ERROR_STR_DESCRIPTION, desc); for (size_t i = 0; i < num_referencing; ++i) { if (referencing[i] == GRPC_ERROR_NONE) continue; @@ -360,7 +353,7 @@ static grpc_error *copy_error_and_unref(grpc_error *in) { GPR_TIMER_BEGIN("copy_error_and_unref", 0); grpc_error *out; if (grpc_error_is_special(in)) { - out = GRPC_ERROR_CREATE("unknown"); + out = GRPC_ERROR_CREATE_FROM_STATIC_STRING("unknown"); if (in == GRPC_ERROR_NONE) { internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION, grpc_slice_from_static_string("no error")); @@ -417,7 +410,7 @@ typedef struct { const char *msg; } special_error_status_map; static special_error_status_map error_status_map[] = { - {GRPC_ERROR_NONE, GRPC_STATUS_OK, NULL}, + {GRPC_ERROR_NONE, GRPC_STATUS_OK, ""}, {GRPC_ERROR_CANCELLED, GRPC_STATUS_CANCELLED, "Cancelled"}, {GRPC_ERROR_OOM, GRPC_STATUS_RESOURCE_EXHAUSTED, "Out of memory"}, }; @@ -448,33 +441,33 @@ bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) { } grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which, - const char *value) { + grpc_slice str) { GPR_TIMER_BEGIN("grpc_error_set_str", 0); grpc_error *new = copy_error_and_unref(src); - internal_set_str(&new, which, - grpc_slice_from_copied_buffer( - value, strlen(value) + 1)); // TODO, pull this up. + internal_set_str(&new, which, str); GPR_TIMER_END("grpc_error_set_str", 0); return new; } -const char *grpc_error_get_str(grpc_error *err, grpc_error_strs which) { +bool grpc_error_get_str(grpc_error *err, grpc_error_strs which, + grpc_slice *str) { if (grpc_error_is_special(err)) { if (which == GRPC_ERROR_STR_GRPC_MESSAGE) { for (size_t i = 0; i < GPR_ARRAY_SIZE(error_status_map); i++) { if (error_status_map[i].error == err) { - return error_status_map[i].msg; + *str = grpc_slice_from_static_string(error_status_map[i].msg); + return true; } } } - return NULL; + return false; } uint8_t slot = err->strs[which]; if (slot != UINT8_MAX) { - return (const char *)GRPC_SLICE_START_PTR( - *(grpc_slice *)(err->arena + slot)); + *str = *(grpc_slice *)(err->arena + slot); + return true; } else { - return NULL; + return false; } } @@ -515,13 +508,14 @@ static void append_str(const char *str, char **s, size_t *sz, size_t *cap) { } } -static void append_esc_str(const char *str, char **s, size_t *sz, size_t *cap) { +static void append_esc_str(const uint8_t *str, size_t len, char **s, size_t *sz, + size_t *cap) { static const char *hex = "0123456789abcdef"; append_chr('"', s, sz, cap); - for (const uint8_t *c = (const uint8_t *)str; *c; c++) { - if (*c < 32 || *c >= 127) { + for (size_t i = 0; i < len; i++, str++) { + if (*str < 32 || *str >= 127) { append_chr('\\', s, sz, cap); - switch (*c) { + switch (*str) { case '\b': append_chr('b', s, sz, cap); break; @@ -541,12 +535,12 @@ static void append_esc_str(const char *str, char **s, size_t *sz, size_t *cap) { append_chr('u', s, sz, cap); append_chr('0', s, sz, cap); append_chr('0', s, sz, cap); - append_chr(hex[*c >> 4], s, sz, cap); - append_chr(hex[*c & 0x0f], s, sz, cap); + append_chr(hex[*str >> 4], s, sz, cap); + append_chr(hex[*str & 0x0f], s, sz, cap); break; } } else { - append_chr((char)*c, s, sz, cap); + append_chr((char)*str, s, sz, cap); } } append_chr('"', s, sz, cap); @@ -586,11 +580,12 @@ static char *key_str(grpc_error_strs which) { return gpr_strdup(error_str_name(which)); } -static char *fmt_str(void *p) { +static char *fmt_str(grpc_slice slice) { char *s = NULL; size_t sz = 0; size_t cap = 0; - append_esc_str(p, &s, &sz, &cap); + append_esc_str((const uint8_t *)GRPC_SLICE_START_PTR(slice), + GRPC_SLICE_LENGTH(slice), &s, &sz, &cap); append_chr(0, &s, &sz, &cap); return s; } @@ -599,9 +594,8 @@ static void collect_strs_kvs(grpc_error *err, kv_pairs *kvs) { for (size_t which = 0; which < GRPC_ERROR_STR_MAX; ++which) { uint8_t slot = err->strs[which]; if (slot != UINT8_MAX) { - append_kv( - kvs, key_str((grpc_error_strs)which), - fmt_str(GRPC_SLICE_START_PTR(*(grpc_slice *)(err->arena + slot)))); + append_kv(kvs, key_str((grpc_error_strs)which), + fmt_str(*(grpc_slice *)(err->arena + slot))); } } } @@ -681,7 +675,8 @@ static char *finish_kvs(kv_pairs *kvs) { append_chr('{', &s, &sz, &cap); for (size_t i = 0; i < kvs->num_kvs; i++) { if (i != 0) append_chr(',', &s, &sz, &cap); - append_esc_str(kvs->kvs[i].key, &s, &sz, &cap); + append_esc_str((const uint8_t *)kvs->kvs[i].key, strlen(kvs->kvs[i].key), + &s, &sz, &cap); gpr_free(kvs->kvs[i].key); append_chr(':', &s, &sz, &cap); append_str(kvs->kvs[i].value, &s, &sz, &cap); @@ -733,10 +728,14 @@ grpc_error *grpc_os_error(const char *file, int line, int err, const char *call_name) { return grpc_error_set_str( grpc_error_set_str( - grpc_error_set_int(grpc_error_create(file, line, "OS Error", NULL, 0), - GRPC_ERROR_INT_ERRNO, err), - GRPC_ERROR_STR_OS_ERROR, strerror(err)), - GRPC_ERROR_STR_SYSCALL, call_name); + grpc_error_set_int( + grpc_error_create(grpc_slice_from_static_string(file), line, + grpc_slice_from_static_string("OS Error"), NULL, + 0), + GRPC_ERROR_INT_ERRNO, err), + GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(strerror(err))), + GRPC_ERROR_STR_SYSCALL, grpc_slice_from_static_string(call_name)); } #ifdef GPR_WINDOWS @@ -745,10 +744,13 @@ grpc_error *grpc_wsa_error(const char *file, int line, int err, char *utf8_message = gpr_format_message(err); grpc_error *error = grpc_error_set_str( grpc_error_set_str( - grpc_error_set_int(grpc_error_create(file, line, "OS Error", NULL, 0), - GRPC_ERROR_INT_WSA_ERROR, err), - GRPC_ERROR_STR_OS_ERROR, utf8_message), - GRPC_ERROR_STR_SYSCALL, call_name); + grpc_error_set_int( + grpc_error_create(grpc_slice_from_static_string(file), line, + grpc_slice_from_static_string("OS Error"), NULL, + 0), + GRPC_ERROR_INT_WSA_ERROR, err), + GRPC_ERROR_STR_OS_ERROR, grpc_slice_from_copied_string(utf8_message)), + GRPC_ERROR_STR_SYSCALL, grpc_slice_from_static_string(call_name)); gpr_free(utf8_message); return error; } diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index eb953947ae..2a44fcfe25 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -37,6 +37,7 @@ #include <stdbool.h> #include <stdint.h> +#include <grpc/slice.h> #include <grpc/status.h> #include <grpc/support/time.h> @@ -45,28 +46,9 @@ extern "C" { #endif /// Opaque representation of an error. -/// Errors are refcounted objects that represent the result of an operation. -/// Ownership laws: -/// if a grpc_error is returned by a function, the caller owns a ref to that -/// instance -/// if a grpc_error is passed to a grpc_closure callback function (functions -/// with the signature: -/// void (*f)(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error)) -/// then those functions do not own a ref to error (but are free to manually -/// take a reference). -/// if a grpc_error is passed to *ANY OTHER FUNCTION* then that function takes -/// ownership of the error -/// Errors have: -/// a set of ints, strings, and timestamps that describe the error -/// always present are: -/// GRPC_ERROR_STR_FILE, GRPC_ERROR_INT_FILE_LINE - source location the error -/// was generated -/// GRPC_ERROR_STR_DESCRIPTION - a human readable description of the error -/// GRPC_ERROR_TIME_CREATED - a timestamp indicating when the error happened -/// an error can also have children; these are other errors that are believed -/// to have contributed to this one. By accumulating children, we can begin -/// to root cause high level failures from low level failures, without having -/// to derive execution paths from log lines +/// See https://github.com/grpc/grpc/blob/master/doc/core/grpc-error.md for a +/// full write up of this object. + typedef struct grpc_error grpc_error; typedef enum { @@ -156,7 +138,7 @@ typedef enum { const char *grpc_error_string(grpc_error *error); /// Create an error - but use GRPC_ERROR_CREATE instead -grpc_error *grpc_error_create(const char *file, int line, const char *desc, +grpc_error *grpc_error_create(grpc_slice file, int line, grpc_slice desc, grpc_error **referencing, size_t num_referencing); /// Create an error (this is the preferred way of generating an error that is /// not due to a system call - for system calls, use GRPC_OS_ERROR or @@ -166,13 +148,21 @@ grpc_error *grpc_error_create(const char *file, int line, const char *desc, /// err = grpc_error_create(x, y, z, r, nr) is equivalent to: /// err = grpc_error_create(x, y, z, NULL, 0); /// for (i=0; i<nr; i++) err = grpc_error_add_child(err, r[i]); -#define GRPC_ERROR_CREATE(desc) \ - grpc_error_create(__FILE__, __LINE__, desc, NULL, 0) +#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc) \ + grpc_error_create(grpc_slice_from_static_string(__FILE__), __LINE__, \ + grpc_slice_from_static_string(desc), NULL, 0) +#define GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc) \ + grpc_error_create(grpc_slice_from_static_string(__FILE__), __LINE__, \ + grpc_slice_from_copied_string(desc), NULL, 0) // Create an error that references some other errors. This function adds a // reference to each error in errs - it does not consume an existing reference -#define GRPC_ERROR_CREATE_REFERENCING(desc, errs, count) \ - grpc_error_create(__FILE__, __LINE__, desc, errs, count) +#define GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(desc, errs, count) \ + grpc_error_create(grpc_slice_from_static_string(__FILE__), __LINE__, \ + grpc_slice_from_static_string(desc), errs, count) +#define GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(desc, errs, count) \ + grpc_error_create(grpc_slice_from_static_string(__FILE__), __LINE__, \ + grpc_slice_from_copied_string(desc), errs, count) //#define GRPC_ERROR_REFCOUNT_DEBUG #ifdef GRPC_ERROR_REFCOUNT_DEBUG @@ -194,10 +184,11 @@ grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which, intptr_t value) GRPC_MUST_USE_RESULT; bool grpc_error_get_int(grpc_error *error, grpc_error_ints which, intptr_t *p); grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which, - const char *value) GRPC_MUST_USE_RESULT; -/// Returns NULL if the specified string is not set. -/// Caller does NOT own return value. -const char *grpc_error_get_str(grpc_error *error, grpc_error_strs which); + grpc_slice str) GRPC_MUST_USE_RESULT; +/// Returns false if the specified string is not set. +/// Caller does NOT own the slice. +bool grpc_error_get_str(grpc_error *error, grpc_error_strs which, + grpc_slice *s); /// Add a child error: an error that is believed to have contributed to this /// error occurring. Allows root causing high level errors from lower level diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index d5acdf0d4b..c4c67df381 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -321,7 +321,7 @@ static bool append_error(grpc_error **composite, grpc_error *error, const char *desc) { if (error == GRPC_ERROR_NONE) return true; if (*composite == GRPC_ERROR_NONE) { - *composite = GRPC_ERROR_CREATE(desc); + *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc); } *composite = grpc_error_add_child(*composite, error); return false; @@ -1110,7 +1110,13 @@ static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, gpr_atm curr = gpr_atm_no_barrier_load(state); switch (curr) { case CLOSURE_NOT_READY: { - /* CLOSURE_NOT_READY -> <closure>. */ + /* CLOSURE_NOT_READY -> <closure>. + + We're guaranteed by API that there's an acquire barrier before here, + so there's no need to double-dip and this can be a release-only. + + The release itself pairs with the acquire half of a set_ready full + barrier. */ if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) { return; /* Successful. Return */ } @@ -1141,9 +1147,9 @@ static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, schedule the closure with the shutdown error */ if ((curr & FD_SHUTDOWN_BIT) > 0) { grpc_error *shutdown_err = (grpc_error *)(curr & ~FD_SHUTDOWN_BIT); - grpc_closure_sched( - exec_ctx, closure, - GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1)); + grpc_closure_sched(exec_ctx, closure, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "FD Shutdown", &shutdown_err, 1)); return; } @@ -1169,7 +1175,9 @@ static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, switch (curr) { case CLOSURE_READY: case CLOSURE_NOT_READY: - if (gpr_atm_full_cas(state, curr, new_state)) { + /* Release cas to pair with a set_ready performing a load of the + shutdown state later */ + if (gpr_atm_rel_cas(state, curr, new_state)) { return; /* early out */ } break; /* retry */ @@ -1183,11 +1191,14 @@ static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, } /* Fd is not shutdown. Schedule the closure and move the state to - shutdown state. */ + shutdown state. + Needs an acquire to pair with setting the closure (and get a + happens-after on that edge), and a release to pair with anything + loading the shutdown state. */ if (gpr_atm_full_cas(state, curr, new_state)) { - grpc_closure_sched( - exec_ctx, (grpc_closure *)curr, - GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1)); + grpc_closure_sched(exec_ctx, (grpc_closure *)curr, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "FD Shutdown", &shutdown_err, 1)); return; } @@ -1212,6 +1223,8 @@ static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) { } case CLOSURE_NOT_READY: { + /* No barrier required as we're transitioning to a state that does not + involve a closure */ if (gpr_atm_no_barrier_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) { return; /* early out */ } @@ -1223,7 +1236,11 @@ static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) { if ((curr & FD_SHUTDOWN_BIT) > 0) { /* The fd is shutdown. Do nothing */ return; - } else if (gpr_atm_full_cas(state, curr, CLOSURE_NOT_READY)) { + } + /* Full cas: acquire pairs with this cas' release in the event of a + spurious set_ready; release pairs with this or the acquire in + notify_on (or set_shutdown) */ + else if (gpr_atm_full_cas(state, curr, CLOSURE_NOT_READY)) { grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE); return; } diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 5ddd5313e2..ca6e855611 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -451,14 +451,16 @@ static grpc_error *fd_shutdown_error(grpc_fd *fd) { if (!fd->shutdown) { return GRPC_ERROR_NONE; } else { - return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1); + return GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "FD shutdown", &fd->shutdown_error, 1); } } static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, grpc_closure *closure) { if (fd->shutdown) { - grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown")); + grpc_closure_sched(exec_ctx, closure, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown")); } else if (*st == CLOSURE_NOT_READY) { /* not ready ==> switch to a waiting state by setting the closure */ *st = closure; @@ -696,7 +698,7 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { static void kick_append_error(grpc_error **composite, grpc_error *error) { if (error == GRPC_ERROR_NONE) return; if (*composite == GRPC_ERROR_NONE) { - *composite = GRPC_ERROR_CREATE("Kick Failure"); + *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Kick Failure"); } *composite = grpc_error_add_child(*composite, error); } @@ -859,7 +861,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { static void work_combine_error(grpc_error **composite, grpc_error *error) { if (error == GRPC_ERROR_NONE) return; if (*composite == GRPC_ERROR_NONE) { - *composite = GRPC_ERROR_CREATE("pollset_work"); + *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("pollset_work"); } *composite = grpc_error_add_child(*composite, error); } @@ -1421,7 +1423,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { g_cvfds.pollcount++; opt = gpr_thd_options_default(); gpr_thd_options_set_detached(&opt); - gpr_thd_new(&t_id, &run_poll, pargs, &opt); + GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt)); // We want the poll() thread to trigger the deadline, so wait forever here gpr_cv_wait(pollcv, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); if (gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) { diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index a5b62aa888..ae3e2eabc3 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -115,8 +115,8 @@ static void maybe_spawn_locked() { /* All previous instances of the thread should have been joined at this point. * Spawn time! */ g_executor.busy = 1; - gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL, - &g_executor.options); + GPR_ASSERT(gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL, + &g_executor.options)); g_executor.pending_join = 1; } diff --git a/src/core/lib/iomgr/load_file.c b/src/core/lib/iomgr/load_file.c index f40c8b28cc..208f74e20c 100644 --- a/src/core/lib/iomgr/load_file.c +++ b/src/core/lib/iomgr/load_file.c @@ -78,9 +78,12 @@ end: *output = result; if (file != NULL) fclose(file); if (error != GRPC_ERROR_NONE) { - grpc_error *error_out = grpc_error_set_str( - GRPC_ERROR_CREATE_REFERENCING("Failed to load file", &error, 1), - GRPC_ERROR_STR_FILENAME, filename); + grpc_error *error_out = + grpc_error_set_str(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Failed to load file", &error, 1), + GRPC_ERROR_STR_FILENAME, + grpc_slice_from_copied_string( + filename)); // TODO(ncteisen), always static? GRPC_ERROR_UNREF(error); error = error_out; } diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index f1897bb91f..94a454c0b7 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -39,6 +39,7 @@ #if defined(GRPC_UV) // Do nothing #elif defined(GPR_MANYLINUX1) +#define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_IP_PKTINFO 1 #define GRPC_HAVE_MSG_NOSIGNAL 1 @@ -65,6 +66,7 @@ #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 #elif defined(GPR_LINUX) +#define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_IP_PKTINFO 1 #define GRPC_HAVE_MSG_NOSIGNAL 1 @@ -90,6 +92,7 @@ #define GRPC_POSIX_SOCKETUTILS #endif #elif defined(GPR_APPLE) +#define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_SO_NOSIGPIPE 1 #define GRPC_HAVE_UNIX_SOCKET 1 #define GRPC_MSG_IOVLEN_TYPE int @@ -100,6 +103,7 @@ #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 #elif defined(GPR_FREEBSD) +#define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_SO_NOSIGPIPE 1 #define GRPC_HAVE_UNIX_SOCKET 1 diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index 50e470d149..d0ede0f2d5 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -73,14 +73,16 @@ static grpc_error *blocking_resolve_address_impl( /* parse name, splitting it into host and port parts */ gpr_split_host_port(name, &host, &port); if (host == NULL) { - err = grpc_error_set_str(GRPC_ERROR_CREATE("unparseable host:port"), - GRPC_ERROR_STR_TARGET_ADDRESS, name); + err = grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("unparseable host:port"), + GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); goto done; } if (port == NULL) { if (default_port == NULL) { - err = grpc_error_set_str(GRPC_ERROR_CREATE("no port in name"), - GRPC_ERROR_STR_TARGET_ADDRESS, name); + err = grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("no port in name"), + GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); goto done; } port = gpr_strdup(default_port); @@ -112,11 +114,15 @@ static grpc_error *blocking_resolve_address_impl( if (s != 0) { err = grpc_error_set_str( grpc_error_set_str( - grpc_error_set_str(grpc_error_set_int(GRPC_ERROR_CREATE("OS Error"), - GRPC_ERROR_INT_ERRNO, s), - GRPC_ERROR_STR_OS_ERROR, gai_strerror(s)), - GRPC_ERROR_STR_SYSCALL, "getaddrinfo"), - GRPC_ERROR_STR_TARGET_ADDRESS, name); + grpc_error_set_str( + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("OS Error"), + GRPC_ERROR_INT_ERRNO, s), + GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(gai_strerror(s))), + GRPC_ERROR_STR_SYSCALL, + grpc_slice_from_static_string("getaddrinfo")), + GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); goto done; } diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index 4d715be94c..102d1aa290 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -92,9 +92,10 @@ static grpc_error *handle_addrinfo_result(int status, struct addrinfo *result, if (status != 0) { grpc_error *error; *addresses = NULL; - error = GRPC_ERROR_CREATE("getaddrinfo failed"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("getaddrinfo failed"); error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); return error; } (*addresses) = gpr_malloc(sizeof(grpc_resolved_addresses)); @@ -153,7 +154,7 @@ static grpc_error *try_split_host_port(const char *name, if (*host == NULL) { char *msg; gpr_asprintf(&msg, "unparseable host:port: '%s'", name); - error = GRPC_ERROR_CREATE(msg); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); return error; } @@ -162,7 +163,7 @@ static grpc_error *try_split_host_port(const char *name, if (default_port == NULL) { char *msg; gpr_asprintf(&msg, "no port in name '%s'", name); - error = GRPC_ERROR_CREATE(msg); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); return error; } @@ -262,8 +263,9 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, if (s != 0) { *addrs = NULL; - err = GRPC_ERROR_CREATE("getaddrinfo failed"); - err = grpc_error_set_str(err, GRPC_ERROR_STR_OS_ERROR, uv_strerror(s)); + err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("getaddrinfo failed"); + err = grpc_error_set_str(err, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(s))); grpc_closure_sched(exec_ctx, on_done, err); gpr_free(r); gpr_free(req); diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index 2439ce3cb7..22eca1fd3b 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -78,7 +78,7 @@ static grpc_error *blocking_resolve_address_impl( if (host == NULL) { char *msg; gpr_asprintf(&msg, "unparseable host:port: '%s'", name); - error = GRPC_ERROR_CREATE(msg); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); goto done; } @@ -86,7 +86,7 @@ static grpc_error *blocking_resolve_address_impl( if (default_port == NULL) { char *msg; gpr_asprintf(&msg, "no port in name '%s'", name); - error = GRPC_ERROR_CREATE(msg); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); goto done; } diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 511ffdcdf1..8dcd80d001 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -279,11 +279,17 @@ static void rq_step_sched(grpc_exec_ctx *exec_ctx, /* update the atomically available resource estimate - use no barriers since timeliness of delivery really doesn't matter much */ static void rq_update_estimate(grpc_resource_quota *resource_quota) { + gpr_atm memory_usage_estimation = MEMORY_USAGE_ESTIMATION_MAX; + if (resource_quota->size != 0) { + memory_usage_estimation = + GPR_CLAMP((gpr_atm)((1.0 - + ((double)resource_quota->free_pool) / + ((double)resource_quota->size)) * + MEMORY_USAGE_ESTIMATION_MAX), + 0, MEMORY_USAGE_ESTIMATION_MAX); + } gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, - (gpr_atm)((1.0 - - ((double)resource_quota->free_pool) / - ((double)resource_quota->size)) * - MEMORY_USAGE_ESTIMATION_MAX)); + memory_usage_estimation); } /* returns true if all allocations are completed */ diff --git a/src/core/lib/iomgr/socket_factory_posix.c b/src/core/lib/iomgr/socket_factory_posix.c new file mode 100644 index 0000000000..1050a14c46 --- /dev/null +++ b/src/core/lib/iomgr/socket_factory_posix.c @@ -0,0 +1,110 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_POSIX_SOCKET + +#include "src/core/lib/iomgr/socket_factory_posix.h" + +#include <grpc/impl/codegen/grpc_types.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> + +void grpc_socket_factory_init(grpc_socket_factory *factory, + const grpc_socket_factory_vtable *vtable) { + factory->vtable = vtable; + gpr_ref_init(&factory->refcount, 1); +} + +int grpc_socket_factory_socket(grpc_socket_factory *factory, int domain, + int type, int protocol) { + return factory->vtable->socket(factory, domain, type, protocol); +} + +int grpc_socket_factory_bind(grpc_socket_factory *factory, int sockfd, + const grpc_resolved_address *addr) { + return factory->vtable->bind(factory, sockfd, addr); +} + +int grpc_socket_factory_compare(grpc_socket_factory *a, + grpc_socket_factory *b) { + int c = GPR_ICMP(a, b); + if (c != 0) { + grpc_socket_factory *sma = a; + grpc_socket_factory *smb = b; + c = GPR_ICMP(sma->vtable, smb->vtable); + if (c == 0) { + c = sma->vtable->compare(sma, smb); + } + } + return c; +} + +grpc_socket_factory *grpc_socket_factory_ref(grpc_socket_factory *factory) { + gpr_ref(&factory->refcount); + return factory; +} + +void grpc_socket_factory_unref(grpc_socket_factory *factory) { + if (gpr_unref(&factory->refcount)) { + factory->vtable->destroy(factory); + } +} + +static void *socket_factory_arg_copy(void *p) { + return grpc_socket_factory_ref(p); +} + +static void socket_factory_arg_destroy(grpc_exec_ctx *exec_ctx, void *p) { + grpc_socket_factory_unref(p); +} + +static int socket_factory_cmp(void *a, void *b) { + return grpc_socket_factory_compare((grpc_socket_factory *)a, + (grpc_socket_factory *)b); +} + +static const grpc_arg_pointer_vtable socket_factory_arg_vtable = { + socket_factory_arg_copy, socket_factory_arg_destroy, socket_factory_cmp}; + +grpc_arg grpc_socket_factory_to_arg(grpc_socket_factory *factory) { + grpc_arg arg; + arg.type = GRPC_ARG_POINTER; + arg.key = GRPC_ARG_SOCKET_FACTORY; + arg.value.pointer.vtable = &socket_factory_arg_vtable; + arg.value.pointer.p = factory; + return arg; +} + +#endif diff --git a/src/core/lib/iomgr/socket_factory_posix.h b/src/core/lib/iomgr/socket_factory_posix.h new file mode 100644 index 0000000000..2c63299030 --- /dev/null +++ b/src/core/lib/iomgr/socket_factory_posix.h @@ -0,0 +1,90 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_SOCKET_FACTORY_POSIX_H +#define GRPC_CORE_LIB_IOMGR_SOCKET_FACTORY_POSIX_H + +#include <grpc/impl/codegen/grpc_types.h> +#include <grpc/support/sync.h> +#include "src/core/lib/iomgr/resolve_address.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** The virtual table of grpc_socket_factory */ +typedef struct { + /** Replacement for socket(2) */ + int (*socket)(grpc_socket_factory *factory, int domain, int type, + int protocol); + /** Replacement for bind(2) */ + int (*bind)(grpc_socket_factory *factory, int sockfd, + const grpc_resolved_address *addr); + /** Compare socket factory \a a and \a b */ + int (*compare)(grpc_socket_factory *a, grpc_socket_factory *b); + /** Destroys the socket factory instance */ + void (*destroy)(grpc_socket_factory *factory); +} grpc_socket_factory_vtable; + +/** The Socket Factory interface allows changes on socket options */ +struct grpc_socket_factory { + const grpc_socket_factory_vtable *vtable; + gpr_refcount refcount; +}; + +/** called by concrete implementations to initialize the base struct */ +void grpc_socket_factory_init(grpc_socket_factory *factory, + const grpc_socket_factory_vtable *vtable); + +/** Wrap \a factory as a grpc_arg */ +grpc_arg grpc_socket_factory_to_arg(grpc_socket_factory *factory); + +/** Perform the equivalent of a socket(2) operation using \a factory */ +int grpc_socket_factory_socket(grpc_socket_factory *factory, int domain, + int type, int protocol); + +/** Perform the equivalent of a bind(2) operation using \a factory */ +int grpc_socket_factory_bind(grpc_socket_factory *factory, int sockfd, + const grpc_resolved_address *addr); + +/** Compare if \a a and \a b are the same factory or have same settings */ +int grpc_socket_factory_compare(grpc_socket_factory *a, grpc_socket_factory *b); + +grpc_socket_factory *grpc_socket_factory_ref(grpc_socket_factory *factory); +void grpc_socket_factory_unref(grpc_socket_factory *factory); + +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_FACTORY_POSIX_H */ diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c index 88e9ade253..bbe642d0fb 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.c +++ b/src/core/lib/iomgr/socket_utils_common_posix.c @@ -90,7 +90,7 @@ grpc_error *grpc_set_socket_no_sigpipe_if_possible(int fd) { return GRPC_OS_ERROR(errno, "getsockopt(SO_NOSIGPIPE)"); } if ((newval != 0) != (val != 0)) { - return GRPC_ERROR_CREATE("Failed to set SO_NOSIGPIPE"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to set SO_NOSIGPIPE"); } #endif return GRPC_ERROR_NONE; @@ -164,7 +164,7 @@ grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse) { return GRPC_OS_ERROR(errno, "getsockopt(SO_REUSEADDR)"); } if ((newval != 0) != val) { - return GRPC_ERROR_CREATE("Failed to set SO_REUSEADDR"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to set SO_REUSEADDR"); } return GRPC_ERROR_NONE; @@ -173,7 +173,8 @@ grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse) { /* set a socket to reuse old addresses */ grpc_error *grpc_set_socket_reuse_port(int fd, int reuse) { #ifndef SO_REUSEPORT - return GRPC_ERROR_CREATE("SO_REUSEPORT unavailable on compiling system"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "SO_REUSEPORT unavailable on compiling system"); #else int val = (reuse != 0); int newval; @@ -185,7 +186,7 @@ grpc_error *grpc_set_socket_reuse_port(int fd, int reuse) { return GRPC_OS_ERROR(errno, "getsockopt(SO_REUSEPORT)"); } if ((newval != 0) != val) { - return GRPC_ERROR_CREATE("Failed to set SO_REUSEPORT"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to set SO_REUSEPORT"); } return GRPC_ERROR_NONE; @@ -204,7 +205,7 @@ grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) { return GRPC_OS_ERROR(errno, "getsockopt(TCP_NODELAY)"); } if ((newval != 0) != val) { - return GRPC_ERROR_CREATE("Failed to set TCP_NODELAY"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to set TCP_NODELAY"); } return GRPC_ERROR_NONE; } @@ -213,7 +214,7 @@ grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) { grpc_error *grpc_set_socket_with_mutator(int fd, grpc_socket_mutator *mutator) { GPR_ASSERT(mutator); if (!grpc_socket_mutator_mutate_fd(mutator, fd)) { - return GRPC_ERROR_CREATE("grpc_socket_mutator failed."); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("grpc_socket_mutator failed."); } return GRPC_ERROR_NONE; } @@ -268,7 +269,8 @@ static grpc_error *error_for_fd(int fd, const grpc_resolved_address *addr) { char *addr_str; grpc_sockaddr_to_string(&addr_str, addr, 0); grpc_error *err = grpc_error_set_str(GRPC_OS_ERROR(errno, "socket"), - GRPC_ERROR_STR_TARGET_ADDRESS, addr_str); + GRPC_ERROR_STR_TARGET_ADDRESS, + grpc_slice_from_copied_string(addr_str)); gpr_free(addr_str); return err; } @@ -276,11 +278,25 @@ static grpc_error *error_for_fd(int fd, const grpc_resolved_address *addr) { grpc_error *grpc_create_dualstack_socket( const grpc_resolved_address *resolved_addr, int type, int protocol, grpc_dualstack_mode *dsmode, int *newfd) { + return grpc_create_dualstack_socket_using_factory(NULL, resolved_addr, type, + protocol, dsmode, newfd); +} + +static int create_socket(grpc_socket_factory *factory, int domain, int type, + int protocol) { + return (factory != NULL) + ? grpc_socket_factory_socket(factory, domain, type, protocol) + : socket(domain, type, protocol); +} + +grpc_error *grpc_create_dualstack_socket_using_factory( + grpc_socket_factory *factory, const grpc_resolved_address *resolved_addr, + int type, int protocol, grpc_dualstack_mode *dsmode, int *newfd) { const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; int family = addr->sa_family; if (family == AF_INET6) { if (grpc_ipv6_loopback_available()) { - *newfd = socket(family, type, protocol); + *newfd = create_socket(factory, family, type, protocol); } else { *newfd = -1; errno = EAFNOSUPPORT; @@ -302,7 +318,7 @@ grpc_error *grpc_create_dualstack_socket( family = AF_INET; } *dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE; - *newfd = socket(family, type, protocol); + *newfd = create_socket(factory, family, type, protocol); return error_for_fd(*newfd, resolved_addr); } diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index e84d3781a1..2c2fc95ff9 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -41,6 +41,7 @@ #include <grpc/impl/codegen/grpc_types.h> #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/socket_factory_posix.h" #include "src/core/lib/iomgr/socket_mutator.h" /* a wrapper for accept or accept4 */ @@ -137,4 +138,10 @@ grpc_error *grpc_create_dualstack_socket(const grpc_resolved_address *addr, grpc_dualstack_mode *dsmode, int *newfd); +/* Same as grpc_create_dualstack_socket(), but use the given socket factory (if + non-null) to create the socket, rather than calling socket() directly. */ +grpc_error *grpc_create_dualstack_socket_using_factory( + grpc_socket_factory *factory, const grpc_resolved_address *addr, int type, + int protocol, grpc_dualstack_mode *dsmode, int *newfd); + #endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H */ diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 0144192b71..a108b10da6 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -121,8 +121,8 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { } gpr_mu_lock(&ac->mu); if (ac->fd != NULL) { - grpc_fd_shutdown(exec_ctx, ac->fd, - GRPC_ERROR_CREATE("connect() timed out")); + grpc_fd_shutdown(exec_ctx, ac->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "connect() timed out")); } done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); @@ -191,7 +191,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { gpr_mu_lock(&ac->mu); if (error != GRPC_ERROR_NONE) { error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, "Timeout occurred"); + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string("Timeout occurred")); goto finish; } @@ -252,12 +253,17 @@ finish: gpr_mu_unlock(&ac->mu); if (error != GRPC_ERROR_NONE) { char *error_descr; - gpr_asprintf(&error_descr, "Failed to connect to remote host: %s", - grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION)); - error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION, error_descr); + grpc_slice str; + bool ret = grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &str); + GPR_ASSERT(ret); + char *desc = grpc_slice_to_c_string(str); + gpr_asprintf(&error_descr, "Failed to connect to remote host: %s", desc); + error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION, + grpc_slice_from_copied_string(error_descr)); gpr_free(error_descr); - error = - grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, ac->addr_str); + gpr_free(desc); + error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, + grpc_slice_from_copied_string(ac->addr_str)); } if (done) { gpr_mu_destroy(&ac->mu); diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index 618483d9cb..682c24ed56 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -100,17 +100,21 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) { *connect->endpoint = grpc_tcp_create( connect->tcp_handle, connect->resource_quota, connect->addr_name); } else { - error = GRPC_ERROR_CREATE("Failed to connect to remote host"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to connect to remote host"); error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, -status); error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); if (status == UV_ECANCELED) { - error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, - "Timeout occurred"); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string("Timeout occurred")); // This should only happen if the handle is already closed } else { - error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, - uv_strerror(status)); + error = grpc_error_set_str( + error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); uv_close((uv_handle_t *)connect->tcp_handle, tcp_close_callback); } } diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index c8dc9e64bd..a356564766 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -123,7 +123,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { socket = NULL; } } else { - error = GRPC_ERROR_CREATE("socket is null"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("socket is null"); } } @@ -238,8 +238,9 @@ failure: GPR_ASSERT(error != GRPC_ERROR_NONE); char *target_uri = grpc_sockaddr_to_uri(addr); grpc_error *final_error = grpc_error_set_str( - GRPC_ERROR_CREATE_REFERENCING("Failed to connect", &error, 1), - GRPC_ERROR_STR_TARGET_ADDRESS, target_uri); + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Failed to connect", + &error, 1), + GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(target_uri)); GRPC_ERROR_UNREF(error); if (socket != NULL) { grpc_winsocket_destroy(socket); diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index a4381f8fc9..4d7cf3ff51 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -111,7 +111,8 @@ typedef struct { static grpc_error *tcp_annotate_error(grpc_error *src_error, grpc_tcp *tcp) { return grpc_error_set_str( grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd), - GRPC_ERROR_STR_TARGET_ADDRESS, tcp->peer_string); + GRPC_ERROR_STR_TARGET_ADDRESS, + grpc_slice_from_copied_string(tcp->peer_string)); } static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, @@ -246,8 +247,10 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { } else if (read_bytes == 0) { /* 0 read size ==> end of stream */ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer); - call_read_cb(exec_ctx, tcp, - tcp_annotate_error(GRPC_ERROR_CREATE("Socket closed"), tcp)); + call_read_cb( + exec_ctx, tcp, + tcp_annotate_error( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp)); TCP_UNREF(exec_ctx, tcp, "read"); } else { GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); @@ -464,10 +467,12 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (buf->length == 0) { GPR_TIMER_END("tcp_write", 0); - grpc_closure_sched(exec_ctx, cb, - grpc_fd_is_shutdown(tcp->em_fd) - ? tcp_annotate_error(GRPC_ERROR_CREATE("EOF"), tcp) - : GRPC_ERROR_NONE); + grpc_closure_sched( + exec_ctx, cb, + grpc_fd_is_shutdown(tcp->em_fd) + ? tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), + tcp) + : GRPC_ERROR_NONE); return; } tcp->outgoing_buffer = buf; diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 5f286a6723..d6a017cf7f 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -44,11 +44,8 @@ #include <errno.h> #include <fcntl.h> -#include <ifaddrs.h> -#include <limits.h> #include <netinet/in.h> #include <netinet/tcp.h> -#include <stdio.h> #include <string.h> #include <sys/socket.h> #include <sys/stat.h> @@ -67,82 +64,10 @@ #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/tcp_posix.h" +#include "src/core/lib/iomgr/tcp_server_utils_posix.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/support/string.h" -#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 - -static gpr_once s_init_max_accept_queue_size; -static int s_max_accept_queue_size; - -/* one listening port */ -typedef struct grpc_tcp_listener grpc_tcp_listener; -struct grpc_tcp_listener { - int fd; - grpc_fd *emfd; - grpc_tcp_server *server; - grpc_resolved_address addr; - int port; - unsigned port_index; - unsigned fd_index; - grpc_closure read_closure; - grpc_closure destroyed_closure; - struct grpc_tcp_listener *next; - /* sibling is a linked list of all listeners for a given port. add_port and - clone_port place all new listeners in the same sibling list. A member of - the 'sibling' list is also a member of the 'next' list. The head of each - sibling list has is_sibling==0, and subsequent members of sibling lists - have is_sibling==1. is_sibling allows separate sibling lists to be - identified while iterating through 'next'. */ - struct grpc_tcp_listener *sibling; - int is_sibling; -}; - -/* the overall server */ -struct grpc_tcp_server { - gpr_refcount refs; - /* Called whenever accept() succeeds on a server port. */ - grpc_tcp_server_cb on_accept_cb; - void *on_accept_cb_arg; - - gpr_mu mu; - - /* active port count: how many ports are actually still listening */ - size_t active_ports; - /* destroyed port count: how many ports are completely destroyed */ - size_t destroyed_ports; - - /* is this server shutting down? */ - bool shutdown; - /* have listeners been shutdown? */ - bool shutdown_listeners; - /* use SO_REUSEPORT */ - bool so_reuseport; - /* expand wildcard addresses to a list of all local addresses */ - bool expand_wildcard_addrs; - - /* linked list of server ports */ - grpc_tcp_listener *head; - grpc_tcp_listener *tail; - unsigned nports; - - /* List of closures passed to shutdown_starting_add(). */ - grpc_closure_list shutdown_starting; - - /* shutdown callback */ - grpc_closure *shutdown_complete; - - /* all pollsets interested in new connections */ - grpc_pollset **pollsets; - /* number of pollsets in the pollsets array */ - size_t pollset_count; - - /* next pollset to assign a channel to */ - gpr_atm next_pollset_to_assign; - - grpc_resource_quota *resource_quota; -}; - static gpr_once check_init = GPR_ONCE_INIT; static bool has_so_reuseport = false; @@ -175,8 +100,8 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, } else { grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); - return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT - " must be an integer"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING(GRPC_ARG_ALLOW_REUSEPORT + " must be an integer"); } } else if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_POINTER) { @@ -186,8 +111,8 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, } else { grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); - return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA - " must be a pointer to a buffer pool"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool"); } } else if (0 == strcmp(GRPC_ARG_EXPAND_WILDCARD_ADDRS, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_INTEGER) { @@ -195,8 +120,8 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, } else { grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); - return GRPC_ERROR_CREATE(GRPC_ARG_EXPAND_WILDCARD_ADDRS - " must be an integer"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + GRPC_ARG_EXPAND_WILDCARD_ADDRS " must be an integer"); } } } @@ -260,10 +185,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { /* delete ALL the things */ gpr_mu_lock(&s->mu); - if (!s->shutdown) { - gpr_mu_unlock(&s->mu); - return; - } + GPR_ASSERT(s->shutdown); if (s->head) { grpc_tcp_listener *sp; @@ -291,8 +213,8 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->active_ports) { grpc_tcp_listener *sp; for (sp = s->head; sp; sp = sp->next) { - grpc_fd_shutdown(exec_ctx, sp->emfd, - GRPC_ERROR_CREATE("Server destroyed")); + grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Server destroyed")); } gpr_mu_unlock(&s->mu); } else { @@ -301,99 +223,6 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } } -/* get max listen queue size on linux */ -static void init_max_accept_queue_size(void) { - int n = SOMAXCONN; - char buf[64]; - FILE *fp = fopen("/proc/sys/net/core/somaxconn", "r"); - if (fp == NULL) { - /* 2.4 kernel. */ - s_max_accept_queue_size = SOMAXCONN; - return; - } - if (fgets(buf, sizeof buf, fp)) { - char *end; - long i = strtol(buf, &end, 10); - if (i > 0 && i <= INT_MAX && end && *end == 0) { - n = (int)i; - } - } - fclose(fp); - s_max_accept_queue_size = n; - - if (s_max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) { - gpr_log(GPR_INFO, - "Suspiciously small accept queue (%d) will probably lead to " - "connection drops", - s_max_accept_queue_size); - } -} - -static int get_max_accept_queue_size(void) { - gpr_once_init(&s_init_max_accept_queue_size, init_max_accept_queue_size); - return s_max_accept_queue_size; -} - -/* Prepare a recently-created socket for listening. */ -static grpc_error *prepare_socket(int fd, const grpc_resolved_address *addr, - bool so_reuseport, int *port) { - grpc_resolved_address sockname_temp; - grpc_error *err = GRPC_ERROR_NONE; - - GPR_ASSERT(fd >= 0); - - if (so_reuseport && !grpc_is_unix_socket(addr)) { - err = grpc_set_socket_reuse_port(fd, 1); - if (err != GRPC_ERROR_NONE) goto error; - } - - err = grpc_set_socket_nonblocking(fd, 1); - if (err != GRPC_ERROR_NONE) goto error; - err = grpc_set_socket_cloexec(fd, 1); - if (err != GRPC_ERROR_NONE) goto error; - if (!grpc_is_unix_socket(addr)) { - err = grpc_set_socket_low_latency(fd, 1); - if (err != GRPC_ERROR_NONE) goto error; - err = grpc_set_socket_reuse_addr(fd, 1); - if (err != GRPC_ERROR_NONE) goto error; - } - err = grpc_set_socket_no_sigpipe_if_possible(fd); - if (err != GRPC_ERROR_NONE) goto error; - - GPR_ASSERT(addr->len < ~(socklen_t)0); - if (bind(fd, (struct sockaddr *)addr->addr, (socklen_t)addr->len) < 0) { - err = GRPC_OS_ERROR(errno, "bind"); - goto error; - } - - if (listen(fd, get_max_accept_queue_size()) < 0) { - err = GRPC_OS_ERROR(errno, "listen"); - goto error; - } - - sockname_temp.len = sizeof(struct sockaddr_storage); - - if (getsockname(fd, (struct sockaddr *)sockname_temp.addr, - (socklen_t *)&sockname_temp.len) < 0) { - err = GRPC_OS_ERROR(errno, "getsockname"); - goto error; - } - - *port = grpc_sockaddr_get_port(&sockname_temp); - return GRPC_ERROR_NONE; - -error: - GPR_ASSERT(err != GRPC_ERROR_NONE); - if (fd >= 0) { - close(fd); - } - grpc_error *ret = grpc_error_set_int( - GRPC_ERROR_CREATE_REFERENCING("Unable to configure socket", &err, 1), - GRPC_ERROR_INT_FD, fd); - GRPC_ERROR_UNREF(err); - return ret; -} - /* event manager callback when reads are ready */ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { grpc_tcp_listener *sp = arg; @@ -469,7 +298,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { error: gpr_mu_lock(&sp->server->mu); - if (0 == --sp->server->active_ports) { + if (0 == --sp->server->active_ports && sp->server->shutdown) { gpr_mu_unlock(&sp->server->mu); deactivated_all_ports(exec_ctx, sp->server); } else { @@ -477,216 +306,6 @@ error: } } -static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd, - const grpc_resolved_address *addr, - unsigned port_index, unsigned fd_index, - grpc_tcp_listener **listener) { - grpc_tcp_listener *sp = NULL; - int port = -1; - char *addr_str; - char *name; - - grpc_error *err = prepare_socket(fd, addr, s->so_reuseport, &port); - if (err == GRPC_ERROR_NONE) { - GPR_ASSERT(port > 0); - grpc_sockaddr_to_string(&addr_str, addr, 1); - gpr_asprintf(&name, "tcp-server-listener:%s", addr_str); - gpr_mu_lock(&s->mu); - s->nports++; - GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); - sp = gpr_malloc(sizeof(grpc_tcp_listener)); - sp->next = NULL; - if (s->head == NULL) { - s->head = sp; - } else { - s->tail->next = sp; - } - s->tail = sp; - sp->server = s; - sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name); - memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); - sp->port = port; - sp->port_index = port_index; - sp->fd_index = fd_index; - sp->is_sibling = 0; - sp->sibling = NULL; - GPR_ASSERT(sp->emfd); - gpr_mu_unlock(&s->mu); - gpr_free(addr_str); - gpr_free(name); - } - - *listener = sp; - return err; -} - -/* If successful, add a listener to s for addr, set *dsmode for the socket, and - return the *listener. */ -static grpc_error *add_addr_to_server(grpc_tcp_server *s, - const grpc_resolved_address *addr, - unsigned port_index, unsigned fd_index, - grpc_dualstack_mode *dsmode, - grpc_tcp_listener **listener) { - grpc_resolved_address addr4_copy; - int fd; - grpc_error *err = - grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, dsmode, &fd); - if (err != GRPC_ERROR_NONE) { - return err; - } - if (*dsmode == GRPC_DSMODE_IPV4 && - grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { - addr = &addr4_copy; - } - return add_socket_to_server(s, fd, addr, port_index, fd_index, listener); -} - -/* Bind to "::" to get a port number not used by any address. */ -static grpc_error *get_unused_port(int *port) { - grpc_resolved_address wild; - grpc_sockaddr_make_wildcard6(0, &wild); - grpc_dualstack_mode dsmode; - int fd; - grpc_error *err = - grpc_create_dualstack_socket(&wild, SOCK_STREAM, 0, &dsmode, &fd); - if (err != GRPC_ERROR_NONE) { - return err; - } - if (dsmode == GRPC_DSMODE_IPV4) { - grpc_sockaddr_make_wildcard4(0, &wild); - } - if (bind(fd, (const struct sockaddr *)wild.addr, (socklen_t)wild.len) != 0) { - err = GRPC_OS_ERROR(errno, "bind"); - close(fd); - return err; - } - if (getsockname(fd, (struct sockaddr *)wild.addr, (socklen_t *)&wild.len) != - 0) { - err = GRPC_OS_ERROR(errno, "getsockname"); - close(fd); - return err; - } - close(fd); - *port = grpc_sockaddr_get_port(&wild); - return *port <= 0 ? GRPC_ERROR_CREATE("Bad port") : GRPC_ERROR_NONE; -} - -/* Return the listener in s with address addr or NULL. */ -static grpc_tcp_listener *find_listener_with_addr(grpc_tcp_server *s, - grpc_resolved_address *addr) { - grpc_tcp_listener *l; - gpr_mu_lock(&s->mu); - for (l = s->head; l != NULL; l = l->next) { - if (l->addr.len != addr->len) { - continue; - } - if (memcmp(l->addr.addr, addr->addr, addr->len) == 0) { - break; - } - } - gpr_mu_unlock(&s->mu); - return l; -} - -/* Get all addresses assigned to network interfaces on the machine and create a - listener for each. requested_port is the port to use for every listener, or 0 - to select one random port that will be used for every listener. Set *out_port - to the port selected. Return GRPC_ERROR_NONE only if all listeners were - added. */ -static grpc_error *add_all_local_addrs_to_server(grpc_tcp_server *s, - unsigned port_index, - int requested_port, - int *out_port) { - struct ifaddrs *ifa = NULL; - struct ifaddrs *ifa_it; - unsigned fd_index = 0; - grpc_tcp_listener *sp = NULL; - grpc_error *err = GRPC_ERROR_NONE; - if (requested_port == 0) { - /* Note: There could be a race where some local addrs can listen on the - selected port and some can't. The sane way to handle this would be to - retry by recreating the whole grpc_tcp_server. Backing out individual - listeners and orphaning the FDs looks like too much trouble. */ - if ((err = get_unused_port(&requested_port)) != GRPC_ERROR_NONE) { - return err; - } else if (requested_port <= 0) { - return GRPC_ERROR_CREATE("Bad get_unused_port()"); - } - gpr_log(GPR_DEBUG, "Picked unused port %d", requested_port); - } - if (getifaddrs(&ifa) != 0 || ifa == NULL) { - return GRPC_OS_ERROR(errno, "getifaddrs"); - } - for (ifa_it = ifa; ifa_it != NULL; ifa_it = ifa_it->ifa_next) { - grpc_resolved_address addr; - char *addr_str = NULL; - grpc_dualstack_mode dsmode; - grpc_tcp_listener *new_sp = NULL; - const char *ifa_name = (ifa_it->ifa_name ? ifa_it->ifa_name : "<unknown>"); - if (ifa_it->ifa_addr == NULL) { - continue; - } else if (ifa_it->ifa_addr->sa_family == AF_INET) { - addr.len = sizeof(struct sockaddr_in); - } else if (ifa_it->ifa_addr->sa_family == AF_INET6) { - addr.len = sizeof(struct sockaddr_in6); - } else { - continue; - } - memcpy(addr.addr, ifa_it->ifa_addr, addr.len); - if (!grpc_sockaddr_set_port(&addr, requested_port)) { - /* Should never happen, because we check sa_family above. */ - err = GRPC_ERROR_CREATE("Failed to set port"); - break; - } - if (grpc_sockaddr_to_string(&addr_str, &addr, 0) < 0) { - addr_str = gpr_strdup("<error>"); - } - gpr_log(GPR_DEBUG, - "Adding local addr from interface %s flags 0x%x to server: %s", - ifa_name, ifa_it->ifa_flags, addr_str); - /* We could have multiple interfaces with the same address (e.g., bonding), - so look for duplicates. */ - if (find_listener_with_addr(s, &addr) != NULL) { - gpr_log(GPR_DEBUG, "Skipping duplicate addr %s on interface %s", addr_str, - ifa_name); - gpr_free(addr_str); - continue; - } - if ((err = add_addr_to_server(s, &addr, port_index, fd_index, &dsmode, - &new_sp)) != GRPC_ERROR_NONE) { - char *err_str = NULL; - grpc_error *root_err; - if (gpr_asprintf(&err_str, "Failed to add listener: %s", addr_str) < 0) { - err_str = gpr_strdup("Failed to add listener"); - } - root_err = GRPC_ERROR_CREATE(err_str); - gpr_free(err_str); - gpr_free(addr_str); - err = grpc_error_add_child(root_err, err); - break; - } else { - GPR_ASSERT(requested_port == new_sp->port); - ++fd_index; - if (sp != NULL) { - new_sp->is_sibling = 1; - sp->sibling = new_sp; - } - sp = new_sp; - } - gpr_free(addr_str); - } - freeifaddrs(ifa); - if (err != GRPC_ERROR_NONE) { - return err; - } else if (sp == NULL) { - return GRPC_ERROR_CREATE("No local addresses"); - } else { - *out_port = sp->port; - return GRPC_ERROR_NONE; - } -} - /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s, unsigned port_index, @@ -701,14 +320,16 @@ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s, grpc_error *v6_err = GRPC_ERROR_NONE; grpc_error *v4_err = GRPC_ERROR_NONE; *out_port = -1; - if (s->expand_wildcard_addrs) { - return add_all_local_addrs_to_server(s, port_index, requested_port, - out_port); + + if (grpc_tcp_server_have_ifaddrs() && s->expand_wildcard_addrs) { + return grpc_tcp_server_add_all_local_addrs(s, port_index, requested_port, + out_port); } + grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6); /* Try listening on IPv6 first. */ - if ((v6_err = add_addr_to_server(s, &wild6, port_index, fd_index, &dsmode, - &sp)) == GRPC_ERROR_NONE) { + if ((v6_err = grpc_tcp_server_add_addr(s, &wild6, port_index, fd_index, + &dsmode, &sp)) == GRPC_ERROR_NONE) { ++fd_index; requested_port = *out_port = sp->port; if (dsmode == GRPC_DSMODE_DUALSTACK || dsmode == GRPC_DSMODE_IPV4) { @@ -717,8 +338,8 @@ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s, } /* If we got a v6-only socket or nothing, try adding 0.0.0.0. */ grpc_sockaddr_set_port(&wild4, requested_port); - if ((v4_err = add_addr_to_server(s, &wild4, port_index, fd_index, &dsmode, - &sp2)) == GRPC_ERROR_NONE) { + if ((v4_err = grpc_tcp_server_add_addr(s, &wild4, port_index, fd_index, + &dsmode, &sp2)) == GRPC_ERROR_NONE) { *out_port = sp2->port; if (sp != NULL) { sp2->is_sibling = 1; @@ -726,12 +347,24 @@ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s, } } if (*out_port > 0) { - GRPC_LOG_IF_ERROR("Failed to add :: listener", v6_err); - GRPC_LOG_IF_ERROR("Failed to add 0.0.0.0 listener", v4_err); + if (v6_err != GRPC_ERROR_NONE) { + gpr_log(GPR_INFO, + "Failed to add :: listener, " + "the environment may not support IPv6: %s", + grpc_error_string(v6_err)); + GRPC_ERROR_UNREF(v6_err); + } + if (v4_err != GRPC_ERROR_NONE) { + gpr_log(GPR_INFO, + "Failed to add 0.0.0.0 listener, " + "the environment may not support IPv4: %s", + grpc_error_string(v4_err)); + GRPC_ERROR_UNREF(v4_err); + } return GRPC_ERROR_NONE; } else { - grpc_error *root_err = - GRPC_ERROR_CREATE("Failed to add any wildcard listeners"); + grpc_error *root_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to add any wildcard listeners"); GPR_ASSERT(v6_err != GRPC_ERROR_NONE && v4_err != GRPC_ERROR_NONE); root_err = grpc_error_add_child(root_err, v6_err); root_err = grpc_error_add_child(root_err, v4_err); @@ -756,7 +389,7 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) { err = grpc_create_dualstack_socket(&listener->addr, SOCK_STREAM, 0, &dsmode, &fd); if (err != GRPC_ERROR_NONE) return err; - err = prepare_socket(fd, &listener->addr, true, &port); + err = grpc_tcp_server_prepare_socket(fd, &listener->addr, true, &port); if (err != GRPC_ERROR_NONE) return err; listener->server->nports++; grpc_sockaddr_to_string(&addr_str, &listener->addr, 1); @@ -828,7 +461,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { addr = &addr6_v4mapped; } - if ((err = add_addr_to_server(s, addr, port_index, 0, &dsmode, &sp)) == + if ((err = grpc_tcp_server_add_addr(s, addr, port_index, 0, &dsmode, &sp)) == GRPC_ERROR_NONE) { *out_port = sp->port; } @@ -951,7 +584,7 @@ void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *sp; for (sp = s->head; sp; sp = sp->next) { grpc_fd_shutdown(exec_ctx, sp->emfd, - GRPC_ERROR_CREATE("Server shutdown")); + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown")); } } gpr_mu_unlock(&s->mu); diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h new file mode 100644 index 0000000000..f5dc8532f9 --- /dev/null +++ b/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -0,0 +1,134 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H +#define GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H + +#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/socket_utils_posix.h" +#include "src/core/lib/iomgr/tcp_server.h" + +/* one listening port */ +typedef struct grpc_tcp_listener { + int fd; + grpc_fd *emfd; + grpc_tcp_server *server; + grpc_resolved_address addr; + int port; + unsigned port_index; + unsigned fd_index; + grpc_closure read_closure; + grpc_closure destroyed_closure; + struct grpc_tcp_listener *next; + /* sibling is a linked list of all listeners for a given port. add_port and + clone_port place all new listeners in the same sibling list. A member of + the 'sibling' list is also a member of the 'next' list. The head of each + sibling list has is_sibling==0, and subsequent members of sibling lists + have is_sibling==1. is_sibling allows separate sibling lists to be + identified while iterating through 'next'. */ + struct grpc_tcp_listener *sibling; + int is_sibling; +} grpc_tcp_listener; + +/* the overall server */ +struct grpc_tcp_server { + gpr_refcount refs; + /* Called whenever accept() succeeds on a server port. */ + grpc_tcp_server_cb on_accept_cb; + void *on_accept_cb_arg; + + gpr_mu mu; + + /* active port count: how many ports are actually still listening */ + size_t active_ports; + /* destroyed port count: how many ports are completely destroyed */ + size_t destroyed_ports; + + /* is this server shutting down? */ + bool shutdown; + /* have listeners been shutdown? */ + bool shutdown_listeners; + /* use SO_REUSEPORT */ + bool so_reuseport; + /* expand wildcard addresses to a list of all local addresses */ + bool expand_wildcard_addrs; + + /* linked list of server ports */ + grpc_tcp_listener *head; + grpc_tcp_listener *tail; + unsigned nports; + + /* List of closures passed to shutdown_starting_add(). */ + grpc_closure_list shutdown_starting; + + /* shutdown callback */ + grpc_closure *shutdown_complete; + + /* all pollsets interested in new connections */ + grpc_pollset **pollsets; + /* number of pollsets in the pollsets array */ + size_t pollset_count; + + /* next pollset to assign a channel to */ + gpr_atm next_pollset_to_assign; + + grpc_resource_quota *resource_quota; +}; + +/* If successful, add a listener to \a s for \a addr, set \a dsmode for the + socket, and return the \a listener. */ +grpc_error *grpc_tcp_server_add_addr(grpc_tcp_server *s, + const grpc_resolved_address *addr, + unsigned port_index, unsigned fd_index, + grpc_dualstack_mode *dsmode, + grpc_tcp_listener **listener); + +/* Get all addresses assigned to network interfaces on the machine and create a + listener for each. requested_port is the port to use for every listener, or 0 + to select one random port that will be used for every listener. Set *out_port + to the port selected. Return GRPC_ERROR_NONE only if all listeners were + added. */ +grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, + unsigned port_index, + int requested_port, + int *out_port); + +/* Prepare a recently-created socket for listening. */ +grpc_error *grpc_tcp_server_prepare_socket(int fd, + const grpc_resolved_address *addr, + bool so_reuseport, int *port); +/* Ruturn true if the platform supports ifaddrs */ +bool grpc_tcp_server_have_ifaddrs(void); + +#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H */ diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.c b/src/core/lib/iomgr/tcp_server_utils_posix_common.c new file mode 100644 index 0000000000..af2b00b4b5 --- /dev/null +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.c @@ -0,0 +1,221 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_POSIX_SOCKET + +#include "src/core/lib/iomgr/tcp_server_utils_posix.h" + +#include <errno.h> +#include <limits.h> +#include <stdio.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/sync.h> + +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/unix_sockets_posix.h" + +#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 + +static gpr_once s_init_max_accept_queue_size; +static int s_max_accept_queue_size; + +/* get max listen queue size on linux */ +static void init_max_accept_queue_size(void) { + int n = SOMAXCONN; + char buf[64]; + FILE *fp = fopen("/proc/sys/net/core/somaxconn", "r"); + if (fp == NULL) { + /* 2.4 kernel. */ + s_max_accept_queue_size = SOMAXCONN; + return; + } + if (fgets(buf, sizeof buf, fp)) { + char *end; + long i = strtol(buf, &end, 10); + if (i > 0 && i <= INT_MAX && end && *end == 0) { + n = (int)i; + } + } + fclose(fp); + s_max_accept_queue_size = n; + + if (s_max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) { + gpr_log(GPR_INFO, + "Suspiciously small accept queue (%d) will probably lead to " + "connection drops", + s_max_accept_queue_size); + } +} + +static int get_max_accept_queue_size(void) { + gpr_once_init(&s_init_max_accept_queue_size, init_max_accept_queue_size); + return s_max_accept_queue_size; +} + +static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd, + const grpc_resolved_address *addr, + unsigned port_index, unsigned fd_index, + grpc_tcp_listener **listener) { + grpc_tcp_listener *sp = NULL; + int port = -1; + char *addr_str; + char *name; + + grpc_error *err = + grpc_tcp_server_prepare_socket(fd, addr, s->so_reuseport, &port); + if (err == GRPC_ERROR_NONE) { + GPR_ASSERT(port > 0); + grpc_sockaddr_to_string(&addr_str, addr, 1); + gpr_asprintf(&name, "tcp-server-listener:%s", addr_str); + gpr_mu_lock(&s->mu); + s->nports++; + GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); + sp = gpr_malloc(sizeof(grpc_tcp_listener)); + sp->next = NULL; + if (s->head == NULL) { + s->head = sp; + } else { + s->tail->next = sp; + } + s->tail = sp; + sp->server = s; + sp->fd = fd; + sp->emfd = grpc_fd_create(fd, name); + memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); + sp->port = port; + sp->port_index = port_index; + sp->fd_index = fd_index; + sp->is_sibling = 0; + sp->sibling = NULL; + GPR_ASSERT(sp->emfd); + gpr_mu_unlock(&s->mu); + gpr_free(addr_str); + gpr_free(name); + } + + *listener = sp; + return err; +} + +/* If successful, add a listener to s for addr, set *dsmode for the socket, and + return the *listener. */ +grpc_error *grpc_tcp_server_add_addr(grpc_tcp_server *s, + const grpc_resolved_address *addr, + unsigned port_index, unsigned fd_index, + grpc_dualstack_mode *dsmode, + grpc_tcp_listener **listener) { + grpc_resolved_address addr4_copy; + int fd; + grpc_error *err = + grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, dsmode, &fd); + if (err != GRPC_ERROR_NONE) { + return err; + } + if (*dsmode == GRPC_DSMODE_IPV4 && + grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { + addr = &addr4_copy; + } + return add_socket_to_server(s, fd, addr, port_index, fd_index, listener); +} + +/* Prepare a recently-created socket for listening. */ +grpc_error *grpc_tcp_server_prepare_socket(int fd, + const grpc_resolved_address *addr, + bool so_reuseport, int *port) { + grpc_resolved_address sockname_temp; + grpc_error *err = GRPC_ERROR_NONE; + + GPR_ASSERT(fd >= 0); + + if (so_reuseport && !grpc_is_unix_socket(addr)) { + err = grpc_set_socket_reuse_port(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + } + + err = grpc_set_socket_nonblocking(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + err = grpc_set_socket_cloexec(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + if (!grpc_is_unix_socket(addr)) { + err = grpc_set_socket_low_latency(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + err = grpc_set_socket_reuse_addr(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + } + err = grpc_set_socket_no_sigpipe_if_possible(fd); + if (err != GRPC_ERROR_NONE) goto error; + + GPR_ASSERT(addr->len < ~(socklen_t)0); + if (bind(fd, (struct sockaddr *)addr->addr, (socklen_t)addr->len) < 0) { + err = GRPC_OS_ERROR(errno, "bind"); + goto error; + } + + if (listen(fd, get_max_accept_queue_size()) < 0) { + err = GRPC_OS_ERROR(errno, "listen"); + goto error; + } + + sockname_temp.len = sizeof(struct sockaddr_storage); + + if (getsockname(fd, (struct sockaddr *)sockname_temp.addr, + (socklen_t *)&sockname_temp.len) < 0) { + err = GRPC_OS_ERROR(errno, "getsockname"); + goto error; + } + + *port = grpc_sockaddr_get_port(&sockname_temp); + return GRPC_ERROR_NONE; + +error: + GPR_ASSERT(err != GRPC_ERROR_NONE); + if (fd >= 0) { + close(fd); + } + grpc_error *ret = + grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Unable to configure socket", &err, 1), + GRPC_ERROR_INT_FD, fd); + GRPC_ERROR_UNREF(err); + return ret; +} + +#endif /* GRPC_POSIX_SOCKET */ diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c b/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c new file mode 100644 index 0000000000..2078a68126 --- /dev/null +++ b/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c @@ -0,0 +1,196 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_HAVE_IFADDRS + +#include "src/core/lib/iomgr/tcp_server_utils_posix.h" + +#include <errno.h> +#include <ifaddrs.h> +#include <stddef.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" + +/* Return the listener in s with address addr or NULL. */ +static grpc_tcp_listener *find_listener_with_addr(grpc_tcp_server *s, + grpc_resolved_address *addr) { + grpc_tcp_listener *l; + gpr_mu_lock(&s->mu); + for (l = s->head; l != NULL; l = l->next) { + if (l->addr.len != addr->len) { + continue; + } + if (memcmp(l->addr.addr, addr->addr, addr->len) == 0) { + break; + } + } + gpr_mu_unlock(&s->mu); + return l; +} + +/* Bind to "::" to get a port number not used by any address. */ +static grpc_error *get_unused_port(int *port) { + grpc_resolved_address wild; + grpc_sockaddr_make_wildcard6(0, &wild); + grpc_dualstack_mode dsmode; + int fd; + grpc_error *err = + grpc_create_dualstack_socket(&wild, SOCK_STREAM, 0, &dsmode, &fd); + if (err != GRPC_ERROR_NONE) { + return err; + } + if (dsmode == GRPC_DSMODE_IPV4) { + grpc_sockaddr_make_wildcard4(0, &wild); + } + if (bind(fd, (const struct sockaddr *)wild.addr, (socklen_t)wild.len) != 0) { + err = GRPC_OS_ERROR(errno, "bind"); + close(fd); + return err; + } + if (getsockname(fd, (struct sockaddr *)wild.addr, (socklen_t *)&wild.len) != + 0) { + err = GRPC_OS_ERROR(errno, "getsockname"); + close(fd); + return err; + } + close(fd); + *port = grpc_sockaddr_get_port(&wild); + return *port <= 0 ? GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad port") + : GRPC_ERROR_NONE; +} + +grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, + unsigned port_index, + int requested_port, + int *out_port) { + struct ifaddrs *ifa = NULL; + struct ifaddrs *ifa_it; + unsigned fd_index = 0; + grpc_tcp_listener *sp = NULL; + grpc_error *err = GRPC_ERROR_NONE; + if (requested_port == 0) { + /* Note: There could be a race where some local addrs can listen on the + selected port and some can't. The sane way to handle this would be to + retry by recreating the whole grpc_tcp_server. Backing out individual + listeners and orphaning the FDs looks like too much trouble. */ + if ((err = get_unused_port(&requested_port)) != GRPC_ERROR_NONE) { + return err; + } else if (requested_port <= 0) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad get_unused_port()"); + } + gpr_log(GPR_DEBUG, "Picked unused port %d", requested_port); + } + if (getifaddrs(&ifa) != 0 || ifa == NULL) { + return GRPC_OS_ERROR(errno, "getifaddrs"); + } + for (ifa_it = ifa; ifa_it != NULL; ifa_it = ifa_it->ifa_next) { + grpc_resolved_address addr; + char *addr_str = NULL; + grpc_dualstack_mode dsmode; + grpc_tcp_listener *new_sp = NULL; + const char *ifa_name = (ifa_it->ifa_name ? ifa_it->ifa_name : "<unknown>"); + if (ifa_it->ifa_addr == NULL) { + continue; + } else if (ifa_it->ifa_addr->sa_family == AF_INET) { + addr.len = sizeof(struct sockaddr_in); + } else if (ifa_it->ifa_addr->sa_family == AF_INET6) { + addr.len = sizeof(struct sockaddr_in6); + } else { + continue; + } + memcpy(addr.addr, ifa_it->ifa_addr, addr.len); + if (!grpc_sockaddr_set_port(&addr, requested_port)) { + /* Should never happen, because we check sa_family above. */ + err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to set port"); + break; + } + if (grpc_sockaddr_to_string(&addr_str, &addr, 0) < 0) { + addr_str = gpr_strdup("<error>"); + } + gpr_log(GPR_DEBUG, + "Adding local addr from interface %s flags 0x%x to server: %s", + ifa_name, ifa_it->ifa_flags, addr_str); + /* We could have multiple interfaces with the same address (e.g., bonding), + so look for duplicates. */ + if (find_listener_with_addr(s, &addr) != NULL) { + gpr_log(GPR_DEBUG, "Skipping duplicate addr %s on interface %s", addr_str, + ifa_name); + gpr_free(addr_str); + continue; + } + if ((err = grpc_tcp_server_add_addr(s, &addr, port_index, fd_index, &dsmode, + &new_sp)) != GRPC_ERROR_NONE) { + char *err_str = NULL; + grpc_error *root_err; + if (gpr_asprintf(&err_str, "Failed to add listener: %s", addr_str) < 0) { + err_str = gpr_strdup("Failed to add listener"); + } + root_err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(err_str); + gpr_free(err_str); + gpr_free(addr_str); + err = grpc_error_add_child(root_err, err); + break; + } else { + GPR_ASSERT(requested_port == new_sp->port); + ++fd_index; + if (sp != NULL) { + new_sp->is_sibling = 1; + sp->sibling = new_sp; + } + sp = new_sp; + } + gpr_free(addr_str); + } + freeifaddrs(ifa); + if (err != GRPC_ERROR_NONE) { + return err; + } else if (sp == NULL) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No local addresses"); + } else { + *out_port = sp->port; + return GRPC_ERROR_NONE; + } +} + +bool grpc_tcp_server_have_ifaddrs(void) { return true; } + +#endif /* GRPC_HAVE_IFADDRS */ diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c b/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c new file mode 100644 index 0000000000..d6a1a0693e --- /dev/null +++ b/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c @@ -0,0 +1,49 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#if defined(GRPC_POSIX_SOCKET) && !defined(GRPC_HAVE_IFADDRS) + +#include "src/core/lib/iomgr/tcp_server_utils_posix.h" + +grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, + unsigned port_index, + int requested_port, + int *out_port) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("no ifaddrs available"); +} + +bool grpc_tcp_server_have_ifaddrs(void) { return false; } + +#endif /* defined(GRPC_POSIX_SOCKET) && !defined(GRPC_HAVE_IFADDRS) */ diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index eed2773f8a..e9246948f5 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -95,8 +95,8 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, } else { grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); - return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA - " must be a pointer to a buffer pool"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool"); } } } @@ -244,17 +244,19 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, // The last argument to uv_tcp_bind is flags status = uv_tcp_bind(handle, (struct sockaddr *)addr->addr, 0); if (status != 0) { - error = GRPC_ERROR_CREATE("Failed to bind to port"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to bind to port"); error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); return error; } status = uv_listen((uv_stream_t *)handle, SOMAXCONN, on_connect); if (status != 0) { - error = GRPC_ERROR_CREATE("Failed to listen to port"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to listen to port"); error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); return error; } @@ -262,9 +264,10 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, status = uv_tcp_getsockname(handle, (struct sockaddr *)&sockname_temp.addr, (int *)&sockname_temp.len); if (status != 0) { - error = GRPC_ERROR_CREATE("getsockname failed"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("getsockname failed"); error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); return error; } @@ -346,15 +349,17 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, if (status == 0) { error = add_socket_to_server(s, handle, addr, port_index, &sp); } else { - error = GRPC_ERROR_CREATE("Failed to initialize UV tcp handle"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to initialize UV tcp handle"); error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); } gpr_free(allocated_addr); if (error != GRPC_ERROR_NONE) { - grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING( + grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to add port to server", &error, 1); GRPC_ERROR_UNREF(error); error = error_out; diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index bd4b9b2df1..12ce7d3fdd 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -122,8 +122,8 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, } else { grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); - return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA - " must be a pointer to a buffer pool"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool"); } } } @@ -248,9 +248,10 @@ failure: GPR_ASSERT(error != GRPC_ERROR_NONE); char *tgtaddr = grpc_sockaddr_to_uri(addr); grpc_error_set_int( - grpc_error_set_str(GRPC_ERROR_CREATE_REFERENCING( + grpc_error_set_str(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to prepare server socket", &error, 1), - GRPC_ERROR_STR_TARGET_ADDRESS, tgtaddr), + GRPC_ERROR_STR_TARGET_ADDRESS, + grpc_slice_from_copied_string(tgtaddr)), GRPC_ERROR_INT_FD, (intptr_t)sock); gpr_free(tgtaddr); GRPC_ERROR_UNREF(error); @@ -533,7 +534,7 @@ done: gpr_free(allocated_addr); if (error != GRPC_ERROR_NONE) { - grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING( + grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to add port to server", &error, 1); GRPC_ERROR_UNREF(error); error = error_out; diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 5541c62068..8e8db9f7b4 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -152,7 +152,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, // TODO(murgatroid99): figure out what the return value here means uv_read_stop(stream); if (nread == UV_EOF) { - error = GRPC_ERROR_CREATE("EOF"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"); } else if (nread > 0) { // Successful read sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread); @@ -173,7 +173,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, } } else { // nread < 0: Error - error = GRPC_ERROR_CREATE("TCP Read failed"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed"); } grpc_closure_sched(&exec_ctx, cb, error); grpc_exec_ctx_finish(&exec_ctx); @@ -193,9 +193,10 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, status = uv_read_start((uv_stream_t *)tcp->handle, alloc_uv_buf, read_callback); if (status != 0) { - error = GRPC_ERROR_CREATE("TCP Read failed at start"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed at start"); error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); grpc_closure_sched(exec_ctx, cb, error); } if (grpc_tcp_trace) { @@ -214,7 +215,7 @@ static void write_callback(uv_write_t *req, int status) { if (status == 0) { error = GRPC_ERROR_NONE; } else { - error = GRPC_ERROR_CREATE("TCP Write failed"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Write failed"); } if (grpc_tcp_trace) { const char *str = grpc_error_string(error); @@ -249,8 +250,8 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } if (tcp->shutting_down) { - grpc_closure_sched(exec_ctx, cb, - GRPC_ERROR_CREATE("TCP socket is shutting down")); + grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "TCP socket is shutting down")); return; } diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 6c413971e3..9134883226 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -175,7 +175,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { if (error == GRPC_ERROR_NONE) { if (info->wsa_error != 0 && !tcp->shutting_down) { char *utf8_message = gpr_format_message(info->wsa_error); - error = GRPC_ERROR_CREATE(utf8_message); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(utf8_message); gpr_free(utf8_message); grpc_slice_unref_internal(exec_ctx, tcp->read_slice); } else { @@ -185,9 +185,9 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { } else { grpc_slice_unref_internal(exec_ctx, tcp->read_slice); error = tcp->shutting_down - ? GRPC_ERROR_CREATE_REFERENCING("TCP stream shutting down", - &tcp->shutdown_error, 1) - : GRPC_ERROR_CREATE("End of TCP stream"); + ? GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "TCP stream shutting down", &tcp->shutdown_error, 1) + : GRPC_ERROR_CREATE_FROM_STATIC_STRING("End of TCP stream"); } } } @@ -208,9 +208,10 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, WSABUF buffer; if (tcp->shutting_down) { - grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE_REFERENCING( - "TCP socket is shutting down", - &tcp->shutdown_error, 1)); + grpc_closure_sched( + exec_ctx, cb, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "TCP socket is shutting down", &tcp->shutdown_error, 1)); return; } @@ -297,9 +298,10 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, size_t len; if (tcp->shutting_down) { - grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE_REFERENCING( - "TCP socket is shutting down", - &tcp->shutdown_error, 1)); + grpc_closure_sched( + exec_ctx, cb, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "TCP socket is shutting down", &tcp->shutdown_error, 1)); return; } diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index d4df96c214..e53c801929 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -109,8 +109,9 @@ void grpc_timer_list_init(gpr_timespec now) { void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { int i; - run_some_expired_timers(exec_ctx, gpr_inf_future(g_clock_type), NULL, - GRPC_ERROR_CREATE("Timer list shutdown")); + run_some_expired_timers( + exec_ctx, gpr_inf_future(g_clock_type), NULL, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown")); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; gpr_mu_destroy(&shard->mu); @@ -182,9 +183,9 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, if (!g_initialized) { timer->pending = false; - grpc_closure_sched( - exec_ctx, timer->closure, - GRPC_ERROR_CREATE("Attempt to create timer before initialization")); + grpc_closure_sched(exec_ctx, timer->closure, + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Attempt to create timer before initialization")); return; } @@ -376,7 +377,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, exec_ctx, now, next, gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 ? GRPC_ERROR_NONE - : GRPC_ERROR_CREATE("Shutting down timer system")); + : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system")); } #endif /* GRPC_TIMER_USE_GENERIC */ diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index f28a14405d..8e8a07578c 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -78,6 +78,10 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, uv_timer->data = timer; timer->uv_timer = uv_timer; uv_timer_start(uv_timer, run_expired_timer, timeout, 0); + /* We assume that gRPC timers are only used alongside other active gRPC + objects, and that there will therefore always be something else keeping + the uv loop alive whenever there is a timer */ + uv_unref((uv_handle_t *)uv_timer); } void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 71e295770a..60579e18ba 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -59,11 +59,13 @@ #include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/socket_factory_posix.h" #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/support/string.h" @@ -89,6 +91,9 @@ struct grpc_udp_listener { struct grpc_udp_server { gpr_mu mu; + /* factory to use for creating and binding sockets, or NULL */ + grpc_socket_factory *socket_factory; + /* active port count: how many ports are actually still listening */ size_t active_ports; /* destroyed port count: how many ports are completely destroyed */ @@ -113,9 +118,24 @@ struct grpc_udp_server { void *user_data; }; -grpc_udp_server *grpc_udp_server_create(void) { +static grpc_socket_factory *get_socket_factory(const grpc_channel_args *args) { + if (args) { + const grpc_arg *arg = grpc_channel_args_find(args, GRPC_ARG_SOCKET_FACTORY); + if (arg) { + GPR_ASSERT(arg->type == GRPC_ARG_POINTER); + return arg->value.pointer.p; + } + } + return NULL; +} + +grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) { grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server)); gpr_mu_init(&s->mu); + s->socket_factory = get_socket_factory(args); + if (s->socket_factory) { + grpc_socket_factory_ref(s->socket_factory); + } s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; @@ -139,6 +159,10 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { gpr_free(sp); } + if (s->socket_factory) { + grpc_socket_factory_unref(s->socket_factory); + } + gpr_free(s); } @@ -162,10 +186,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { /* delete ALL the things */ gpr_mu_lock(&s->mu); - if (!s->shutdown) { - gpr_mu_unlock(&s->mu); - return; - } + GPR_ASSERT(s->shutdown); if (s->head) { grpc_udp_listener *sp; @@ -205,8 +226,8 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, for (sp = s->head; sp; sp = sp->next) { GPR_ASSERT(sp->orphan_cb); sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data); - grpc_fd_shutdown(exec_ctx, sp->emfd, - GRPC_ERROR_CREATE("Server destroyed")); + grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Server destroyed")); } gpr_mu_unlock(&s->mu); } else { @@ -215,8 +236,17 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, } } +static int bind_socket(grpc_socket_factory *socket_factory, int sockfd, + const grpc_resolved_address *addr) { + return (socket_factory != NULL) + ? grpc_socket_factory_bind(socket_factory, sockfd, addr) + : bind(sockfd, (struct sockaddr *)addr->addr, + (socklen_t)addr->len); +} + /* Prepare a recently-created socket for listening. */ -static int prepare_socket(int fd, const grpc_resolved_address *addr) { +static int prepare_socket(grpc_socket_factory *socket_factory, int fd, + const grpc_resolved_address *addr) { grpc_resolved_address sockname_temp; struct sockaddr *addr_ptr = (struct sockaddr *)addr->addr; /* Set send/receive socket buffers to 1 MB */ @@ -246,7 +276,7 @@ static int prepare_socket(int fd, const grpc_resolved_address *addr) { } GPR_ASSERT(addr->len < ~(socklen_t)0); - if (bind(fd, (struct sockaddr *)addr, (socklen_t)addr->len) < 0) { + if (bind_socket(socket_factory, fd, addr) < 0) { char *addr_str; grpc_sockaddr_to_string(&addr_str, addr, 0); gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno)); @@ -288,7 +318,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_mu_lock(&sp->server->mu); if (error != GRPC_ERROR_NONE) { - if (0 == --sp->server->active_ports) { + if (0 == --sp->server->active_ports && sp->server->shutdown) { gpr_mu_unlock(&sp->server->mu); deactivated_all_ports(exec_ctx, sp->server); } else { @@ -311,7 +341,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_mu_lock(&(sp->server->mu)); if (error != GRPC_ERROR_NONE) { - if (0 == --sp->server->active_ports) { + if (0 == --sp->server->active_ports && sp->server->shutdown) { gpr_mu_unlock(&sp->server->mu); deactivated_all_ports(exec_ctx, sp->server); } else { @@ -339,7 +369,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, char *addr_str; char *name; - port = prepare_socket(fd, addr); + port = prepare_socket(s->socket_factory, fd, addr); if (port >= 0) { grpc_sockaddr_to_string(&addr_str, addr, 1); gpr_asprintf(&name, "udp-server-listener:%s", addr_str); @@ -417,8 +447,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, /* Try listening on IPv6 first. */ addr = &wild6; // TODO(rjshade): Test and propagate the returned grpc_error*: - GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, - &dsmode, &fd)); + GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( + s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); allocated_port1 = add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { @@ -433,8 +463,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, } // TODO(rjshade): Test and propagate the returned grpc_error*: - GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, - &dsmode, &fd)); + GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( + s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); if (fd < 0) { gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); } diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 90842a47f0..9df3fe4d1f 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -58,7 +58,7 @@ typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, void *user_data); /* Create a server, initially not bound to any ports */ -grpc_udp_server *grpc_udp_server_create(void); +grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args); /* Start listening to bound ports. user_data is passed to callbacks. */ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server, diff --git a/src/core/lib/iomgr/unix_sockets_posix.c b/src/core/lib/iomgr/unix_sockets_posix.c index 1233cec04e..281865aece 100644 --- a/src/core/lib/iomgr/unix_sockets_posix.c +++ b/src/core/lib/iomgr/unix_sockets_posix.c @@ -60,7 +60,7 @@ grpc_error *grpc_resolve_unix_domain_address(const char *name, gpr_asprintf(&err_msg, "Path name should not have more than %" PRIuPTR " characters.", GPR_ARRAY_SIZE(un->sun_path) - 1); - err = GRPC_ERROR_CREATE(err_msg); + err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(err_msg); gpr_free(err_msg); return err; } diff --git a/src/core/lib/iomgr/unix_sockets_posix_noop.c b/src/core/lib/iomgr/unix_sockets_posix_noop.c index 1daf5152c1..b9602cbf8b 100644 --- a/src/core/lib/iomgr/unix_sockets_posix_noop.c +++ b/src/core/lib/iomgr/unix_sockets_posix_noop.c @@ -47,7 +47,8 @@ void grpc_create_socketpair_if_unix(int sv[2]) { grpc_error *grpc_resolve_unix_domain_address( const char *name, grpc_resolved_addresses **addresses) { *addresses = NULL; - return GRPC_ERROR_CREATE("Unix domain sockets are not supported on Windows"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Unix domain sockets are not supported on Windows"); } int grpc_is_unix_socket(const grpc_resolved_address *addr) { return false; } diff --git a/src/core/lib/profiling/basic_timers.c b/src/core/lib/profiling/basic_timers.c index 1f1987fb8e..bc8e27714c 100644 --- a/src/core/lib/profiling/basic_timers.c +++ b/src/core/lib/profiling/basic_timers.c @@ -218,7 +218,7 @@ void gpr_timers_set_log_filename(const char *filename) { static void init_output() { gpr_thd_options options = gpr_thd_options_default(); gpr_thd_options_set_joinable(&options); - gpr_thd_new(&g_writing_thread, writing_thread, NULL, &options); + GPR_ASSERT(gpr_thd_new(&g_writing_thread, writing_thread, NULL, &options)); atexit(finish_writing); } diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c index dd44621347..97501e6788 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.c +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c @@ -180,7 +180,7 @@ static grpc_error *create_default_creds_from_path( grpc_slice creds_data = grpc_empty_slice(); grpc_error *error = GRPC_ERROR_NONE; if (creds_path == NULL) { - error = GRPC_ERROR_CREATE("creds_path unset"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("creds_path unset"); goto end; } error = grpc_load_file(creds_path, 0, &creds_data); @@ -190,10 +190,9 @@ static grpc_error *create_default_creds_from_path( json = grpc_json_parse_string_with_len( (char *)GRPC_SLICE_START_PTR(creds_data), GRPC_SLICE_LENGTH(creds_data)); if (json == NULL) { - char *dump = grpc_dump_slice(creds_data, GPR_DUMP_HEX | GPR_DUMP_ASCII); - error = grpc_error_set_str(GRPC_ERROR_CREATE("Failed to parse JSON"), - GRPC_ERROR_STR_RAW_BYTES, dump); - gpr_free(dump); + error = grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to parse JSON"), + GRPC_ERROR_STR_RAW_BYTES, grpc_slice_ref_internal(creds_data)); goto end; } @@ -204,7 +203,7 @@ static grpc_error *create_default_creds_from_path( grpc_service_account_jwt_access_credentials_create_from_auth_json_key( exec_ctx, key, grpc_max_auth_token_lifetime()); if (result == NULL) { - error = GRPC_ERROR_CREATE( + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "grpc_service_account_jwt_access_credentials_create_from_auth_json_" "key failed"); } @@ -217,7 +216,7 @@ static grpc_error *create_default_creds_from_path( result = grpc_refresh_token_credentials_create_from_auth_refresh_token(token); if (result == NULL) { - error = GRPC_ERROR_CREATE( + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "grpc_refresh_token_credentials_create_from_auth_refresh_token " "failed"); } @@ -236,7 +235,8 @@ end: grpc_channel_credentials *grpc_google_default_credentials_create(void) { grpc_channel_credentials *result = NULL; grpc_call_credentials *call_creds = NULL; - grpc_error *error = GRPC_ERROR_CREATE("Failed to create Google credentials"); + grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to create Google credentials"); grpc_error *err; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -274,7 +274,8 @@ grpc_channel_credentials *grpc_google_default_credentials_create(void) { call_creds = grpc_google_compute_engine_credentials_create(NULL); if (call_creds == NULL) { error = grpc_error_add_child( - error, GRPC_ERROR_CREATE("Failed to get credentials from network")); + error, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to get credentials from network")); } } } diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index 8dea1d98ff..8f321b9911 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -95,7 +95,8 @@ static void reset_auth_metadata_context( static void add_error(grpc_error **combined, grpc_error *error) { if (error == GRPC_ERROR_NONE) return; if (*combined == GRPC_ERROR_NONE) { - *combined = GRPC_ERROR_CREATE("Client auth metadata plugin error"); + *combined = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Client auth metadata plugin error"); } *combined = grpc_error_add_child(*combined, error); } @@ -114,9 +115,10 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data, grpc_error *error = GRPC_ERROR_NONE; if (status != GRPC_CREDENTIALS_OK) { error = grpc_error_set_int( - GRPC_ERROR_CREATE(error_details != NULL && strlen(error_details) > 0 - ? error_details - : "Credentials failed to get metadata."), + GRPC_ERROR_CREATE_FROM_COPIED_STRING( + error_details != NULL && strlen(error_details) > 0 + ? error_details + : "Credentials failed to get metadata."), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED); } else { GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT); @@ -192,7 +194,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_finish_with_failure( exec_ctx, op, grpc_error_set_int( - GRPC_ERROR_CREATE( + GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Incompatible credentials set on channel and call."), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED)); return; @@ -225,9 +227,10 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data, host); gpr_free(host); grpc_call_element_signal_error( - exec_ctx, elem, grpc_error_set_int(GRPC_ERROR_CREATE(error_msg), - GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAUTHENTICATED)); + exec_ctx, elem, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAUTHENTICATED)); gpr_free(error_msg); } } diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 7d58843d69..568d70fa38 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -162,7 +162,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, if (error != GRPC_ERROR_NONE) { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, ep->read_buffer); - call_read_cb(exec_ctx, ep, GRPC_ERROR_CREATE_REFERENCING( + call_read_cb(exec_ctx, ep, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Secure read failed", &error, 1)); return; } @@ -220,8 +220,10 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, if (result != TSI_OK) { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, ep->read_buffer); - call_read_cb(exec_ctx, ep, grpc_set_tsi_error_result( - GRPC_ERROR_CREATE("Unwrap failed"), result)); + call_read_cb( + exec_ctx, ep, + grpc_set_tsi_error_result( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unwrap failed"), result)); return; } @@ -332,7 +334,8 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &ep->output_buffer); grpc_closure_sched( exec_ctx, cb, - grpc_set_tsi_error_result(GRPC_ERROR_CREATE("Wrap failed"), result)); + grpc_set_tsi_error_result( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result)); GPR_TIMER_END("secure_endpoint.endpoint_write", 0); return; } diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c index ad083a730f..b0cbc83639 100644 --- a/src/core/lib/security/transport/security_connector.c +++ b/src/core/lib/security/transport/security_connector.c @@ -137,9 +137,9 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx, grpc_auth_context **auth_context, grpc_closure *on_peer_checked) { if (sc == NULL) { - grpc_closure_sched( - exec_ctx, on_peer_checked, - GRPC_ERROR_CREATE("cannot check peer -- no security connector")); + grpc_closure_sched(exec_ctx, on_peer_checked, + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "cannot check peer -- no security connector")); tsi_peer_destruct(&peer); } else { sc->vtable->check_peer(exec_ctx, sc, peer, auth_context, on_peer_checked); @@ -330,7 +330,8 @@ static void fake_check_peer(grpc_exec_ctx *exec_ctx, grpc_error *error = GRPC_ERROR_NONE; *auth_context = NULL; if (peer.property_count != 1) { - error = GRPC_ERROR_CREATE("Fake peers should only have 1 property."); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Fake peers should only have 1 property."); goto end; } prop_name = peer.properties[0].name; @@ -339,13 +340,14 @@ static void fake_check_peer(grpc_exec_ctx *exec_ctx, char *msg; gpr_asprintf(&msg, "Unexpected property in fake peer: %s.", prop_name == NULL ? "<EMPTY>" : prop_name); - error = GRPC_ERROR_CREATE(msg); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); goto end; } if (strncmp(peer.properties[0].value.data, TSI_FAKE_CERTIFICATE_TYPE, peer.properties[0].value.length)) { - error = GRPC_ERROR_CREATE("Invalid value for cert type property."); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Invalid value for cert type property."); goto end; } *auth_context = grpc_auth_context_create(NULL); @@ -586,18 +588,19 @@ static grpc_error *ssl_check_peer(grpc_security_connector *sc, const tsi_peer_property *p = tsi_peer_get_property_by_name(peer, TSI_SSL_ALPN_SELECTED_PROTOCOL); if (p == NULL) { - return GRPC_ERROR_CREATE( + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Cannot check peer: missing selected ALPN property."); } if (!grpc_chttp2_is_alpn_version_supported(p->value.data, p->value.length)) { - return GRPC_ERROR_CREATE("Cannot check peer: invalid ALPN value."); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Cannot check peer: invalid ALPN value."); } /* Check the peer name if specified. */ if (peer_name != NULL && !ssl_host_matches_name(peer, peer_name)) { char *msg; gpr_asprintf(&msg, "Peer name %s is not in peer certificate", peer_name); - grpc_error *error = GRPC_ERROR_CREATE(msg); + grpc_error *error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); return error; } diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index 7065d261ba..2f39327670 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -120,7 +120,7 @@ static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx, if (error == GRPC_ERROR_NONE) { // If we were shut down after the handshake succeeded but before an // endpoint callback was invoked, we need to generate our own error. - error = GRPC_ERROR_CREATE("Handshaker shutdown"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshaker shutdown"); } const char *msg = grpc_error_string(error); gpr_log(GPR_DEBUG, "Security handshake failed: %s", msg); @@ -156,7 +156,8 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, tsi_handshaker_create_frame_protector(h->handshaker, NULL, &protector); if (result != TSI_OK) { error = grpc_set_tsi_error_result( - GRPC_ERROR_CREATE("Frame protector creation failed"), result); + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Frame protector creation failed"), + result); security_handshake_failed_locked(exec_ctx, h, error); goto done; } @@ -191,7 +192,7 @@ static grpc_error *check_peer_locked(grpc_exec_ctx *exec_ctx, tsi_result result = tsi_handshaker_extract_peer(h->handshaker, &peer); if (result != TSI_OK) { return grpc_set_tsi_error_result( - GRPC_ERROR_CREATE("Peer extraction failed"), result); + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Peer extraction failed"), result); } grpc_security_connector_check_peer(exec_ctx, h->connector, peer, &h->auth_context, &h->on_peer_checked); @@ -215,8 +216,8 @@ static grpc_error *send_handshake_bytes_to_peer_locked(grpc_exec_ctx *exec_ctx, } } while (result == TSI_INCOMPLETE_DATA); if (result != TSI_OK) { - return grpc_set_tsi_error_result(GRPC_ERROR_CREATE("Handshake failed"), - result); + return grpc_set_tsi_error_result( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake failed"), result); } // Send data. grpc_slice to_send = @@ -234,8 +235,8 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&h->mu); if (error != GRPC_ERROR_NONE || h->shutdown) { security_handshake_failed_locked( - exec_ctx, h, - GRPC_ERROR_CREATE_REFERENCING("Handshake read failed", &error, 1)); + exec_ctx, h, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Handshake read failed", &error, 1)); gpr_mu_unlock(&h->mu); security_handshaker_unref(exec_ctx, h); return; @@ -270,8 +271,9 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, } if (result != TSI_OK) { security_handshake_failed_locked( - exec_ctx, h, grpc_set_tsi_error_result( - GRPC_ERROR_CREATE("Handshake failed"), result)); + exec_ctx, h, + grpc_set_tsi_error_result( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake failed"), result)); gpr_mu_unlock(&h->mu); security_handshaker_unref(exec_ctx, h); return; @@ -314,8 +316,8 @@ static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&h->mu); if (error != GRPC_ERROR_NONE || h->shutdown) { security_handshake_failed_locked( - exec_ctx, h, - GRPC_ERROR_CREATE_REFERENCING("Handshake write failed", &error, 1)); + exec_ctx, h, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Handshake write failed", &error, 1)); gpr_mu_unlock(&h->mu); security_handshaker_unref(exec_ctx, h); return; @@ -429,7 +431,8 @@ static void fail_handshaker_do_handshake(grpc_exec_ctx *exec_ctx, grpc_closure *on_handshake_done, grpc_handshaker_args *args) { grpc_closure_sched(exec_ctx, on_handshake_done, - GRPC_ERROR_CREATE("Failed to create security handshaker")); + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to create security handshaker")); } static const grpc_handshaker_vtable fail_handshaker_vtable = { diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index 01cb473177..3cf0632220 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -144,9 +144,10 @@ static void on_md_processing_done( calld->transport_op->send_message = NULL; } calld->transport_op->send_trailing_metadata = NULL; - grpc_closure_sched(&exec_ctx, calld->on_done_recv, - grpc_error_set_int(GRPC_ERROR_CREATE(error_details), - GRPC_ERROR_INT_GRPC_STATUS, status)); + grpc_closure_sched( + &exec_ctx, calld->on_done_recv, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details), + GRPC_ERROR_INT_GRPC_STATUS, status)); } grpc_exec_ctx_finish(&exec_ctx); @@ -158,7 +159,7 @@ static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; if (error == GRPC_ERROR_NONE) { - if (chand->creds->processor.process != NULL) { + if (chand->creds != NULL && chand->creds->processor.process != NULL) { calld->md = metadata_batch_to_md_array(calld->recv_initial_metadata); chand->creds->processor.process( chand->creds->processor.state, calld->auth_context, @@ -242,7 +243,6 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(!args->is_last); GPR_ASSERT(auth_context != NULL); - GPR_ASSERT(creds != NULL); /* initialize members */ chand->auth_context = diff --git a/src/core/lib/security/transport/tsi_error.c b/src/core/lib/security/transport/tsi_error.c index afc1733567..eae0a676b0 100644 --- a/src/core/lib/security/transport/tsi_error.c +++ b/src/core/lib/security/transport/tsi_error.c @@ -34,7 +34,9 @@ #include "src/core/lib/security/transport/tsi_error.h" grpc_error *grpc_set_tsi_error_result(grpc_error *error, tsi_result result) { - return grpc_error_set_int(grpc_error_set_str(error, GRPC_ERROR_STR_TSI_ERROR, - tsi_result_to_string(result)), - GRPC_ERROR_INT_TSI_CODE, result); + return grpc_error_set_int( + grpc_error_set_str( + error, GRPC_ERROR_STR_TSI_ERROR, + grpc_slice_from_static_string(tsi_result_to_string(result))), + GRPC_ERROR_INT_TSI_CODE, result); } diff --git a/src/core/lib/security/util/b64.c b/src/core/lib/security/util/b64.c index 09c8213131..0d5a917660 100644 --- a/src/core/lib/security/util/b64.c +++ b/src/core/lib/security/util/b64.c @@ -71,15 +71,31 @@ static const char base64_url_safe_chars[] = char *grpc_base64_encode(const void *vdata, size_t data_size, int url_safe, int multiline) { - const unsigned char *data = vdata; - const char *base64_chars = - url_safe ? base64_url_safe_chars : base64_url_unsafe_chars; + size_t result_projected_size = + grpc_base64_estimate_encoded_size(data_size, url_safe, multiline); + char *result = gpr_malloc(result_projected_size); + grpc_base64_encode_core(result, vdata, data_size, url_safe, multiline); + return result; +} + +size_t grpc_base64_estimate_encoded_size(size_t data_size, int url_safe, + int multiline) { size_t result_projected_size = 4 * ((data_size + 3) / 3) + 2 * (multiline ? (data_size / (3 * GRPC_BASE64_MULTILINE_NUM_BLOCKS)) : 0) + 1; - char *result = gpr_malloc(result_projected_size); + return result_projected_size; +} + +void grpc_base64_encode_core(char *result, const void *vdata, size_t data_size, + int url_safe, int multiline) { + const unsigned char *data = vdata; + const char *base64_chars = + url_safe ? base64_url_safe_chars : base64_url_unsafe_chars; + const size_t result_projected_size = + grpc_base64_estimate_encoded_size(data_size, url_safe, multiline); + char *current = result; size_t num_blocks = 0; size_t i = 0; @@ -119,7 +135,6 @@ char *grpc_base64_encode(const void *vdata, size_t data_size, int url_safe, GPR_ASSERT(current >= result); GPR_ASSERT((uintptr_t)(current - result) < result_projected_size); result[current - result] = '\0'; - return result; } grpc_slice grpc_base64_decode(grpc_exec_ctx *exec_ctx, const char *b64, diff --git a/src/core/lib/security/util/b64.h b/src/core/lib/security/util/b64.h index d42a136f61..ef52291c6a 100644 --- a/src/core/lib/security/util/b64.h +++ b/src/core/lib/security/util/b64.h @@ -37,10 +37,22 @@ #include <grpc/slice.h> /* Encodes data using base64. It is the caller's responsability to free - the returned char * using gpr_free. Returns NULL on NULL input. */ + the returned char * using gpr_free. Returns NULL on NULL input. + TODO(makdharma) : change the flags to bool from int */ char *grpc_base64_encode(const void *data, size_t data_size, int url_safe, int multiline); +/* estimate the upper bound on size of base64 encoded data. The actual size + * is guaranteed to be less than or equal to the size returned here. */ +size_t grpc_base64_estimate_encoded_size(size_t data_size, int url_safe, + int multiline); + +/* Encodes data using base64 and write it to memory pointed to by result. It is + * the caller's responsiblity to allocate enough memory in |result| to fit the + * encoded data. */ +void grpc_base64_encode_core(char *result, const void *vdata, size_t data_size, + int url_safe, int multiline); + /* Decodes data according to the base64 specification. Returns an empty slice in case of failure. */ grpc_slice grpc_base64_decode(grpc_exec_ctx *exec_ctx, const char *b64, diff --git a/src/core/lib/support/atm.c b/src/core/lib/support/atm.c new file mode 100644 index 0000000000..06e8432caf --- /dev/null +++ b/src/core/lib/support/atm.c @@ -0,0 +1,47 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/atm.h> +#include <grpc/support/useful.h> + +gpr_atm gpr_atm_no_barrier_clamped_add(gpr_atm *value, gpr_atm delta, + gpr_atm min, gpr_atm max) { + gpr_atm current; + gpr_atm new; + do { + current = gpr_atm_no_barrier_load(value); + new = GPR_CLAMP(current + delta, min, max); + if (new == current) break; + } while (!gpr_atm_no_barrier_cas(value, current, new)); + return new; +} diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 2c5d8c0ff3..9342c5f8e9 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -264,7 +264,7 @@ static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, static void add_init_error(grpc_error **composite, grpc_error *new) { if (new == GRPC_ERROR_NONE) return; if (*composite == GRPC_ERROR_NONE) - *composite = GRPC_ERROR_CREATE("Call creation failed"); + *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed"); *composite = grpc_error_add_child(*composite, new); } @@ -335,17 +335,17 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, * call. */ if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) { if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) { - add_init_error(&error, - GRPC_ERROR_CREATE("Census tracing propagation requested " - "without Census context propagation")); + add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Census tracing propagation requested " + "without Census context propagation")); } grpc_call_context_set( call, GRPC_CONTEXT_TRACING, args->parent_call->context[GRPC_CONTEXT_TRACING].value, NULL); } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) { - add_init_error(&error, - GRPC_ERROR_CREATE("Census context propagation requested " - "without Census tracing propagation")); + add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Census context propagation requested " + "without Census tracing propagation")); } if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) { call->cancellation_is_inherited = 1; @@ -603,8 +603,9 @@ static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, static grpc_error *error_from_status(grpc_status_code status, const char *description) { return grpc_error_set_int( - grpc_error_set_str(GRPC_ERROR_CREATE(description), - GRPC_ERROR_STR_GRPC_MESSAGE, description), + grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description), + GRPC_ERROR_STR_GRPC_MESSAGE, + grpc_slice_from_copied_string(description)), GRPC_ERROR_INT_GRPC_STATUS, status); } @@ -624,16 +625,15 @@ static bool get_final_status_from( void (*set_value)(grpc_status_code code, void *user_data), void *set_value_user_data, grpc_slice *details) { grpc_status_code code; - const char *msg = NULL; - grpc_error_get_status(error, call->send_deadline, &code, &msg, NULL); + grpc_slice slice = grpc_empty_slice(); + grpc_error_get_status(error, call->send_deadline, &code, &slice, NULL); if (code == GRPC_STATUS_OK && !allow_ok_status) { return false; } set_value(code, set_value_user_data); if (details != NULL) { - *details = - msg == NULL ? grpc_empty_slice() : grpc_slice_from_copied_string(msg); + *details = grpc_slice_ref_internal(slice); } return true; } @@ -896,18 +896,19 @@ static void recv_common_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_error *error = status_code == GRPC_STATUS_OK ? GRPC_ERROR_NONE - : grpc_error_set_int(GRPC_ERROR_CREATE("Error received from peer"), + : grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Error received from peer"), GRPC_ERROR_INT_GRPC_STATUS, (intptr_t)status_code); if (b->idx.named.grpc_message != NULL) { - char *msg = - grpc_slice_to_c_string(GRPC_MDVALUE(b->idx.named.grpc_message->md)); - error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, msg); - gpr_free(msg); + error = grpc_error_set_str( + error, GRPC_ERROR_STR_GRPC_MESSAGE, + grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md))); grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_message); } else if (error != GRPC_ERROR_NONE) { - error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, ""); + error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, + grpc_empty_slice()); } set_status_from_error(exec_ctx, call, STATUS_FROM_WIRE, error); @@ -1056,8 +1057,8 @@ static grpc_error *consolidate_batch_errors(batch_control *bctl) { bctl->errors[0] = NULL; return e; } else { - grpc_error *error = - GRPC_ERROR_CREATE_REFERENCING("Call batch failed", bctl->errors, n); + grpc_error *error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Call batch failed", bctl->errors, n); for (size_t i = 0; i < n; i++) { GRPC_ERROR_UNREF(bctl->errors[i]); bctl->errors[i] = NULL; @@ -1521,7 +1522,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, { grpc_error *override_error = GRPC_ERROR_NONE; if (op->data.send_status_from_server.status != GRPC_STATUS_OK) { - override_error = GRPC_ERROR_CREATE("Error from server send status"); + override_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Error from server send status"); } if (op->data.send_status_from_server.status_details != NULL) { call->send_extra_metadata[1].md = grpc_mdelem_from_slices( @@ -1531,8 +1533,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->send_extra_metadata_count++; char *msg = grpc_slice_to_c_string( GRPC_MDVALUE(call->send_extra_metadata[1].md)); - override_error = grpc_error_set_str( - override_error, GRPC_ERROR_STR_GRPC_MESSAGE, msg); + override_error = + grpc_error_set_str(override_error, GRPC_ERROR_STR_GRPC_MESSAGE, + grpc_slice_from_copied_string(msg)); gpr_free(msg); } set_status_from_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 2b700b2f67..b4bfb92042 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -85,19 +85,10 @@ struct grpc_channel { static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, - const grpc_channel_args *input_args, - grpc_channel_stack_type channel_stack_type, - grpc_transport *optional_transport) { - grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create(); - grpc_channel_stack_builder_set_channel_arguments(exec_ctx, builder, - input_args); - grpc_channel_stack_builder_set_target(builder, target); - grpc_channel_stack_builder_set_transport(builder, optional_transport); - if (!grpc_channel_init_create_stack(exec_ctx, builder, channel_stack_type)) { - grpc_channel_stack_builder_destroy(exec_ctx, builder); - return NULL; - } +grpc_channel *grpc_channel_create_with_builder( + grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, + grpc_channel_stack_type channel_stack_type) { + char *target = gpr_strdup(grpc_channel_stack_builder_get_target(builder)); grpc_channel_args *args = grpc_channel_args_copy( grpc_channel_stack_builder_get_channel_arguments(builder)); grpc_channel *channel; @@ -108,11 +99,12 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, gpr_log(GPR_ERROR, "channel stack builder failed: %s", grpc_error_string(error)); GRPC_ERROR_UNREF(error); + gpr_free(target); goto done; } memset(channel, 0, sizeof(*channel)); - channel->target = gpr_strdup(target); + channel->target = target; channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type); gpr_mu_init(&channel->registered_call_mu); channel->registered_calls = NULL; @@ -183,10 +175,33 @@ done: return channel; } +grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, + const grpc_channel_args *input_args, + grpc_channel_stack_type channel_stack_type, + grpc_transport *optional_transport) { + grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create(); + grpc_channel_stack_builder_set_channel_arguments(exec_ctx, builder, + input_args); + grpc_channel_stack_builder_set_target(builder, target); + grpc_channel_stack_builder_set_transport(builder, optional_transport); + if (!grpc_channel_init_create_stack(exec_ctx, builder, channel_stack_type)) { + grpc_channel_stack_builder_destroy(exec_ctx, builder); + return NULL; + } + return grpc_channel_create_with_builder(exec_ctx, builder, + channel_stack_type); +} + size_t grpc_channel_get_call_size_estimate(grpc_channel *channel) { #define ROUND_UP_SIZE 256 + /* We round up our current estimate to the NEXT value of ROUND_UP_SIZE. + This ensures: + 1. a consistent size allocation when our estimate is drifting slowly + (which is common) - which tends to help most allocators reuse memory + 2. a small amount of allowed growth over the estimate without hitting + the arena size doubling case, reducing overall memory usage */ return ((size_t)gpr_atm_no_barrier_load(&channel->call_size_estimate) + - ROUND_UP_SIZE) & + 2 * ROUND_UP_SIZE) & ~(size_t)(ROUND_UP_SIZE - 1); } @@ -380,7 +395,8 @@ void grpc_channel_destroy(grpc_channel *channel) { grpc_channel_element *elem; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel)); - op->disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed"); + op->disconnect_with_error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Destroyed"); elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); elem->filter->start_transport_op(&exec_ctx, elem, op); diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index c4aebd8b9b..0f203a3e59 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -35,6 +35,7 @@ #define GRPC_CORE_LIB_SURFACE_CHANNEL_H #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/surface/channel_stack_type.h" grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, @@ -42,6 +43,10 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, grpc_channel_stack_type channel_stack_type, grpc_transport *optional_transport); +grpc_channel *grpc_channel_create_with_builder( + grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, + grpc_channel_stack_type channel_stack_type); + /** Create a call given a grpc_channel, in order to call \a method. Progress is tied to activity on \a pollset_set. The returned call object is meant to be used with \a grpc_call_start_batch_and_execute, which relies on diff --git a/src/core/lib/surface/completion_queue_factory.c b/src/core/lib/surface/completion_queue_factory.c new file mode 100644 index 0000000000..db67a5192b --- /dev/null +++ b/src/core/lib/surface/completion_queue_factory.c @@ -0,0 +1,77 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/surface/completion_queue_factory.h" +#include "src/core/lib/surface/completion_queue.h" + +#include <grpc/support/log.h> + +/* TODO (sreek) - Currently this does not use the attributes arg. This will be + added in a future PR */ +static grpc_completion_queue* default_create( + const grpc_completion_queue_factory* factory, + const grpc_completion_queue_attributes* attributes) { + return grpc_completion_queue_create(NULL); +} + +static grpc_completion_queue_factory_vtable default_vtable = {default_create}; + +static const grpc_completion_queue_factory g_default_cq_factory = { + "Default Factory", NULL, &default_vtable}; + +const grpc_completion_queue_factory* grpc_completion_queue_factory_lookup( + const grpc_completion_queue_attributes* attributes) { + /* As we add more fields to grpc_completion_queue_attributes, we may have to + change this assert to: + GPR_ASSERT (attributes->version >= 1 && + attributes->version <= GRPC_CQ_CURRENT_VERSION) */ + GPR_ASSERT(attributes->version == 1); + + /* The default factory can handle version 1 of the attributes structure. We + may have to change this as more fields are added to the structure */ + return &g_default_cq_factory; +} + +grpc_completion_queue* grpc_completion_queue_create_for_next(void* reserved) { + GPR_ASSERT(!reserved); + grpc_completion_queue_attributes attr = {1, GRPC_CQ_NEXT, + GRPC_CQ_DEFAULT_POLLING}; + return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr); +} + +grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) { + GPR_ASSERT(!reserved); + grpc_completion_queue_attributes attr = {1, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}; + return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr); +} diff --git a/src/core/lib/surface/completion_queue_factory.h b/src/core/lib/surface/completion_queue_factory.h new file mode 100644 index 0000000000..57e90b5090 --- /dev/null +++ b/src/core/lib/surface/completion_queue_factory.h @@ -0,0 +1,51 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_FACTORY_H +#define GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_FACTORY_H + +#include <grpc/grpc.h> +#include "src/core/lib/surface/completion_queue.h" + +typedef struct grpc_completion_queue_factory_vtable { + grpc_completion_queue* (*create)(const grpc_completion_queue_factory*, + const grpc_completion_queue_attributes*); +} grpc_completion_queue_factory_vtable; + +struct grpc_completion_queue_factory { + const char* name; + void* data; /* Factory specific data */ + grpc_completion_queue_factory_vtable* vtable; +}; + +#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_FACTORY_H */ diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 9ddb88bd11..0c408aa288 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -90,7 +90,8 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, fill_metadata(exec_ctx, elem, op->recv_trailing_metadata); } grpc_transport_stream_op_finish_with_failure( - exec_ctx, op, GRPC_ERROR_CREATE("lame client channel")); + exec_ctx, op, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); } static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { @@ -111,8 +112,9 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE); } if (op->send_ping != NULL) { - grpc_closure_sched(exec_ctx, op->send_ping, - GRPC_ERROR_CREATE("lame client channel")); + grpc_closure_sched( + exec_ctx, op->send_ping, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); } GRPC_ERROR_UNREF(op->disconnect_with_error); if (op->on_consumed != NULL) { diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 1186a4af63..a123c9ca43 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -288,10 +288,10 @@ static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_channel_element *elem; op->goaway_error = - send_goaway - ? grpc_error_set_int(GRPC_ERROR_CREATE("Server shutdown"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK) - : GRPC_ERROR_NONE; + send_goaway ? grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK) + : GRPC_ERROR_NONE; op->set_accept_stream = true; sc->slice = grpc_slice_from_copied_string("Server shutdown"); op->disconnect_with_error = send_disconnect; @@ -712,8 +712,9 @@ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, return; } - kill_pending_work_locked(exec_ctx, server, - GRPC_ERROR_CREATE("Server Shutdown")); + kill_pending_work_locked( + exec_ctx, server, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); if (server->root_channel_data.next != &server->root_channel_data || server->listeners_destroyed < num_listeners(server)) { @@ -771,8 +772,8 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, /* do nothing */ } else { grpc_error *src_error = error; - error = - GRPC_ERROR_CREATE_REFERENCING("Missing :authority or :path", &error, 1); + error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Missing :authority or :path", &error, 1); GRPC_ERROR_UNREF(src_error); } @@ -1219,7 +1220,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, op->on_connectivity_state_change = &chand->channel_connectivity_changed; op->connectivity_state = &chand->connectivity_state; if (gpr_atm_acq_load(&s->shutdown_flag) != 0) { - op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown"); + op->disconnect_with_error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"); } grpc_transport_perform_op(exec_ctx, transport, op); } @@ -1277,8 +1279,9 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); - kill_pending_work_locked(&exec_ctx, server, - GRPC_ERROR_CREATE("Server Shutdown")); + kill_pending_work_locked( + &exec_ctx, server, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); gpr_mu_unlock(&server->mu_call); maybe_finish_shutdown(&exec_ctx, server); @@ -1308,8 +1311,9 @@ void grpc_server_cancel_all_calls(grpc_server *server) { channel_broadcaster_init(server, &broadcaster); gpr_mu_unlock(&server->mu_global); - channel_broadcaster_shutdown(&exec_ctx, &broadcaster, false /* send_goaway */, - GRPC_ERROR_CREATE("Cancelling all calls")); + channel_broadcaster_shutdown( + &exec_ctx, &broadcaster, false /* send_goaway */, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls")); grpc_exec_ctx_finish(&exec_ctx); } @@ -1357,16 +1361,16 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { fail_call(exec_ctx, server, cq_idx, rc, - GRPC_ERROR_CREATE("Server Shutdown")); + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); return GRPC_CALL_OK; } request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]); if (request_id == -1) { /* out of request ids: just fail this one */ fail_call(exec_ctx, server, cq_idx, rc, - grpc_error_set_int(GRPC_ERROR_CREATE("Out of request ids"), - GRPC_ERROR_INT_LIMIT, - server->max_requested_calls_per_cq)); + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"), + GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq)); return GRPC_CALL_OK; } switch (rc->type) { diff --git a/src/core/lib/surface/validate_metadata.c b/src/core/lib/surface/validate_metadata.c index 7ec9137265..6e76c4efe7 100644 --- a/src/core/lib/surface/validate_metadata.c +++ b/src/core/lib/surface/validate_metadata.c @@ -39,6 +39,7 @@ #include <grpc/support/port_platform.h> #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" static grpc_error *conforms_to(grpc_slice slice, const uint8_t *legal_bits, @@ -52,9 +53,10 @@ static grpc_error *conforms_to(grpc_slice slice, const uint8_t *legal_bits, if ((legal_bits[byte] & (1 << bit)) == 0) { char *dump = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); grpc_error *error = grpc_error_set_str( - grpc_error_set_int(GRPC_ERROR_CREATE(err_desc), GRPC_ERROR_INT_OFFSET, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(err_desc), + GRPC_ERROR_INT_OFFSET, p - GRPC_SLICE_START_PTR(slice)), - GRPC_ERROR_STR_RAW_BYTES, dump); + GRPC_ERROR_STR_RAW_BYTES, grpc_slice_from_copied_string(dump)); gpr_free(dump); return error; } @@ -74,10 +76,12 @@ grpc_error *grpc_validate_header_key_is_legal(grpc_slice slice) { 0x80, 0xfe, 0xff, 0xff, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; if (GRPC_SLICE_LENGTH(slice) == 0) { - return GRPC_ERROR_CREATE("Metadata keys cannot be zero length"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Metadata keys cannot be zero length"); } if (GRPC_SLICE_START_PTR(slice)[0] == ':') { - return GRPC_ERROR_CREATE("Metadata keys cannot start with :"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Metadata keys cannot start with :"); } return conforms_to(slice, legal_header_bits, "Illegal header key"); } diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index afe1f6164d..3757b25267 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -79,7 +79,8 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx, *w->current = GRPC_CHANNEL_SHUTDOWN; error = GRPC_ERROR_NONE; } else { - error = GRPC_ERROR_CREATE("Shutdown connectivity owner"); + error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutdown connectivity owner"); } grpc_closure_sched(exec_ctx, w->notify, error); gpr_free(w); diff --git a/src/core/lib/transport/error_utils.c b/src/core/lib/transport/error_utils.c index ef55e561fb..4e70f8749d 100644 --- a/src/core/lib/transport/error_utils.c +++ b/src/core/lib/transport/error_utils.c @@ -55,7 +55,7 @@ static grpc_error *recursively_find_error_with_field(grpc_error *error, } void grpc_error_get_status(grpc_error *error, gpr_timespec deadline, - grpc_status_code *code, const char **msg, + grpc_status_code *code, grpc_slice *slice, grpc_http2_error_code *http_error) { // Start with the parent error and recurse through the tree of children // until we find the first one that has a status code. @@ -97,11 +97,11 @@ void grpc_error_get_status(grpc_error *error, gpr_timespec deadline, // If the error has a status message, use it. Otherwise, fall back to // the error description. - if (msg != NULL) { - *msg = grpc_error_get_str(found_error, GRPC_ERROR_STR_GRPC_MESSAGE); - if (*msg == NULL && error != GRPC_ERROR_NONE) { - *msg = grpc_error_get_str(found_error, GRPC_ERROR_STR_DESCRIPTION); - if (*msg == NULL) *msg = "unknown error"; // Just in case. + if (slice != NULL) { + if (!grpc_error_get_str(found_error, GRPC_ERROR_STR_GRPC_MESSAGE, slice)) { + if (!grpc_error_get_str(found_error, GRPC_ERROR_STR_DESCRIPTION, slice)) { + *slice = grpc_slice_from_static_string("unknown error"); + } } } diff --git a/src/core/lib/transport/error_utils.h b/src/core/lib/transport/error_utils.h index 105338880a..3b44466ab8 100644 --- a/src/core/lib/transport/error_utils.h +++ b/src/core/lib/transport/error_utils.h @@ -44,7 +44,7 @@ /// attributes (code, msg, http_status) are unneeded, they can be passed as /// NULL. void grpc_error_get_status(grpc_error *error, gpr_timespec deadline, - grpc_status_code *code, const char **msg, + grpc_status_code *code, grpc_slice *slice, grpc_http2_error_code *http_status); /// A utility function to check whether there is a clear status code that diff --git a/src/core/lib/transport/metadata_batch.c b/src/core/lib/transport/metadata_batch.c index fc2c52bd8a..fa73244aa4 100644 --- a/src/core/lib/transport/metadata_batch.c +++ b/src/core/lib/transport/metadata_batch.c @@ -101,12 +101,10 @@ void grpc_metadata_batch_destroy(grpc_exec_ctx *exec_ctx, } grpc_error *grpc_attach_md_to_error(grpc_error *src, grpc_mdelem md) { - char *k = grpc_slice_to_c_string(GRPC_MDKEY(md)); - char *v = grpc_slice_to_c_string(GRPC_MDVALUE(md)); grpc_error *out = grpc_error_set_str( - grpc_error_set_str(src, GRPC_ERROR_STR_KEY, k), GRPC_ERROR_STR_VALUE, v); - gpr_free(k); - gpr_free(v); + grpc_error_set_str(src, GRPC_ERROR_STR_KEY, + grpc_slice_ref_internal(GRPC_MDKEY(md))), + GRPC_ERROR_STR_VALUE, grpc_slice_ref_internal(GRPC_MDVALUE(md))); return out; } @@ -126,7 +124,8 @@ static grpc_error *maybe_link_callout(grpc_metadata_batch *batch, return GRPC_ERROR_NONE; } return grpc_attach_md_to_error( - GRPC_ERROR_CREATE("Unallowed duplicate metadata"), storage->md); + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unallowed duplicate metadata"), + storage->md); } static void maybe_unlink_callout(grpc_metadata_batch *batch, @@ -302,7 +301,7 @@ static void add_error(grpc_error **composite, grpc_error *error, const char *composite_error_string) { if (error == GRPC_ERROR_NONE) return; if (*composite == GRPC_ERROR_NONE) { - *composite = GRPC_ERROR_CREATE(composite_error_string); + *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(composite_error_string); } *composite = grpc_error_add_child(*composite, error); } diff --git a/src/core/lib/transport/service_config.c b/src/core/lib/transport/service_config.c index 12da2a88fe..1195f75044 100644 --- a/src/core/lib/transport/service_config.c +++ b/src/core/lib/transport/service_config.c @@ -93,6 +93,18 @@ void grpc_service_config_destroy(grpc_service_config* service_config) { gpr_free(service_config); } +void grpc_service_config_parse_global_params( + const grpc_service_config* service_config, + void (*process_json)(const grpc_json* json, void* arg), void* arg) { + const grpc_json* json = service_config->json_tree; + if (json->type != GRPC_JSON_OBJECT || json->key != NULL) return; + for (grpc_json* field = json->child; field != NULL; field = field->next) { + if (field->key == NULL) return; + if (strcmp(field->key, "methodConfig") == 0) continue; + process_json(field, arg); + } +} + const char* grpc_service_config_get_lb_policy_name( const grpc_service_config* service_config) { const grpc_json* json = service_config->json_tree; diff --git a/src/core/lib/transport/service_config.h b/src/core/lib/transport/service_config.h index cd739a593c..ebfc59b534 100644 --- a/src/core/lib/transport/service_config.h +++ b/src/core/lib/transport/service_config.h @@ -42,6 +42,12 @@ typedef struct grpc_service_config grpc_service_config; grpc_service_config* grpc_service_config_create(const char* json_string); void grpc_service_config_destroy(grpc_service_config* service_config); +/// Invokes \a process_json() for each global parameter in the service +/// config. \a arg is passed as the second argument to \a process_json(). +void grpc_service_config_parse_global_params( + const grpc_service_config* service_config, + void (*process_json)(const grpc_json* json, void* arg), void* arg); + /// Gets the LB policy name from \a service_config. /// Returns NULL if no LB policy name was specified. /// Caller does NOT take ownership. |