diff options
author | 2017-03-28 10:13:07 -0700 | |
---|---|---|
committer | 2017-03-28 10:13:07 -0700 | |
commit | b6090a697b6559155ef8ac80d4343204183571d7 (patch) | |
tree | 1fffa6968d3ee46476a1106e70f3730144412bdb /src/core/lib | |
parent | 306efc787a8a224b4fc5b523fd551f402932d14e (diff) | |
parent | 739cecb0bc1f1ba3b2e0b390795cbaf429ec81c2 (diff) |
Merge github.com:grpc/grpc into new_transport_op
Diffstat (limited to 'src/core/lib')
67 files changed, 1177 insertions, 492 deletions
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index 42ef7b7806..75c68a5534 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -90,7 +90,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), &args->call_stack->refcount, args->server_transport_data, args->arena); return r == 0 ? GRPC_ERROR_NONE - : GRPC_ERROR_CREATE("transport stream initialization failed"); + : GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "transport stream initialization failed"); } static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 9c484101e8..939ed21677 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -55,9 +55,9 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, if (error != GRPC_ERROR_CANCELLED) { grpc_call_element_signal_error( exec_ctx, elem, - grpc_error_set_int(GRPC_ERROR_CREATE("Deadline Exceeded"), - GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_DEADLINE_EXCEEDED)); + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED)); } GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer"); } diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index 1b4240bb10..5861fa6f54 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -236,8 +236,9 @@ static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void* arg, static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_handshake_manager* mgr = arg; if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled. - grpc_handshake_manager_shutdown(exec_ctx, mgr, - GRPC_ERROR_CREATE("Handshake timed out")); + grpc_handshake_manager_shutdown( + exec_ctx, mgr, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake timed out")); } grpc_handshake_manager_unref(exec_ctx, mgr); } diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index a0cbeaabf3..e43b97335c 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -36,6 +36,7 @@ #include <grpc/support/string_util.h> #include <string.h> #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/security/util/b64.h" #include "src/core/lib/slice/percent_encoding.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -56,7 +57,6 @@ typedef struct call_data { grpc_linked_mdelem te_trailers; grpc_linked_mdelem content_type; grpc_linked_mdelem user_agent; - grpc_linked_mdelem payload_bin; grpc_metadata_batch *recv_initial_metadata; grpc_metadata_batch *recv_trailing_metadata; @@ -108,11 +108,11 @@ static grpc_error *client_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, grpc_error *e = grpc_error_set_str( grpc_error_set_int( grpc_error_set_str( - GRPC_ERROR_CREATE( + GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Received http2 :status header with non-200 OK status"), - GRPC_ERROR_STR_VALUE, val), + GRPC_ERROR_STR_VALUE, grpc_slice_from_copied_string(val)), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED), - GRPC_ERROR_STR_GRPC_MESSAGE, msg); + GRPC_ERROR_STR_GRPC_MESSAGE, grpc_slice_from_copied_string(msg)); gpr_free(val); gpr_free(msg); return e; @@ -252,18 +252,13 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { } } -typedef struct hc_mutate_op_result { - grpc_error *error; - bool op_stalled; -} hc_mutate_op_result; - -static hc_mutate_op_result hc_mutate_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { +static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; - hc_mutate_op_result result = {.error = GRPC_ERROR_NONE, .op_stalled = false}; + grpc_error *error; if (op->send_initial_metadata) { /* Decide which HTTP VERB to use. We use GET if the request is marked @@ -298,23 +293,63 @@ static hc_mutate_op_result hc_mutate_op(grpc_exec_ctx *exec_ctx, calld->send_length = op->payload->send_message.send_message->length; calld->send_flags = op->payload->send_message.send_message->flags; continue_send_message(exec_ctx, elem); - result.op_stalled = true; if (calld->send_message_blocked == false) { - /* when all the send_message data is available, then create a MDELEM and - append to headers */ - grpc_mdelem payload_bin = grpc_mdelem_from_slices( - exec_ctx, GRPC_MDSTR_GRPC_PAYLOAD_BIN, - grpc_slice_from_copied_buffer( - (const char *)calld->payload_bytes, - op->payload->send_message.send_message->length)); - result.error = grpc_metadata_batch_add_tail( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, - &calld->payload_bin, payload_bin); - if (result.error != GRPC_ERROR_NONE) return result; + /* when all the send_message data is available, then modify the path + * MDELEM by appending base64 encoded query to the path */ + const int k_url_safe = 1; + const int k_multi_line = 0; + const unsigned char k_query_separator = '?'; + + grpc_slice path_slice = + GRPC_MDVALUE(op->payload->send_initial_metadata + .send_initial_metadata->idx.named.path->md); + /* sum up individual component's lengths and allocate enough memory to + * hold combined path+query */ + size_t estimated_len = GRPC_SLICE_LENGTH(path_slice); + estimated_len++; /* for the '?' */ + estimated_len += grpc_base64_estimate_encoded_size( + op->payload->send_message.send_message->length, k_url_safe, + k_multi_line); + estimated_len += 1; /* for the trailing 0 */ + grpc_slice path_with_query_slice = grpc_slice_malloc(estimated_len); + + /* memcopy individual pieces into this slice */ + uint8_t *write_ptr = + (uint8_t *)GRPC_SLICE_START_PTR(path_with_query_slice); + uint8_t *original_path = (uint8_t *)GRPC_SLICE_START_PTR(path_slice); + memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice)); + write_ptr += GRPC_SLICE_LENGTH(path_slice); + + *write_ptr = k_query_separator; + write_ptr++; /* for the '?' */ + + grpc_base64_encode_core((char *)write_ptr, calld->payload_bytes, + op->payload->send_message.send_message->length, + k_url_safe, k_multi_line); + + /* remove trailing unused memory and add trailing 0 to terminate string + */ + char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice); + /* safe to use strlen since base64_encode will always add '\0' */ + size_t path_length = strlen(t) + 1; + *(t + path_length) = '\0'; + path_with_query_slice = + grpc_slice_sub(path_with_query_slice, 0, path_length); + + /* substitute previous path with the new path+query */ + grpc_mdelem mdelem_path_and_query = grpc_mdelem_from_slices( + exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice); + grpc_metadata_batch *b = + op->payload->send_initial_metadata.send_initial_metadata; + error = grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path, + mdelem_path_and_query); + if (error != GRPC_ERROR_NONE) return error; + calld->on_complete = op->on_complete; op->on_complete = &calld->hc_on_complete; op->send_message = NULL; + grpc_slice_unref_internal(exec_ctx, path_with_query_slice); } else { /* Not all data is available. Fall back to POST. */ gpr_log(GPR_DEBUG, @@ -342,26 +377,26 @@ static hc_mutate_op_result hc_mutate_op(grpc_exec_ctx *exec_ctx, /* Send : prefixed headers, which have to be before any application layer headers. */ - result.error = grpc_metadata_batch_add_head( + error = grpc_metadata_batch_add_head( exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, &calld->method, method); - if (result.error != GRPC_ERROR_NONE) return result; - result.error = grpc_metadata_batch_add_head( + if (error != GRPC_ERROR_NONE) return error; + error = grpc_metadata_batch_add_head( exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, &calld->scheme, channeld->static_scheme); - if (result.error != GRPC_ERROR_NONE) return result; - result.error = grpc_metadata_batch_add_tail( + if (error != GRPC_ERROR_NONE) return error; + error = grpc_metadata_batch_add_tail( exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, &calld->te_trailers, GRPC_MDELEM_TE_TRAILERS); - if (result.error != GRPC_ERROR_NONE) return result; - result.error = grpc_metadata_batch_add_tail( + if (error != GRPC_ERROR_NONE) return error; + error = grpc_metadata_batch_add_tail( exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, &calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC); - if (result.error != GRPC_ERROR_NONE) return result; - result.error = grpc_metadata_batch_add_tail( + if (error != GRPC_ERROR_NONE) return error; + error = grpc_metadata_batch_add_tail( exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, &calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent)); - if (result.error != GRPC_ERROR_NONE) return result; + if (error != GRPC_ERROR_NONE) return error; } if (op->recv_initial_metadata) { @@ -382,7 +417,7 @@ static hc_mutate_op_result hc_mutate_op(grpc_exec_ctx *exec_ctx, op->on_complete = &calld->hc_on_recv_trailing_metadata; } - return result; + return GRPC_ERROR_NONE; } static void hc_start_transport_op(grpc_exec_ctx *exec_ctx, @@ -390,14 +425,18 @@ static void hc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) { GPR_TIMER_BEGIN("hc_start_transport_op", 0); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - hc_mutate_op_result result = hc_mutate_op(exec_ctx, elem, op); - if (result.error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, result.error); - } else if (result.op_stalled) { - /* Don't forward the op. send_message contains slices that aren't ready yet. - The call will be forwarded by the op_complete of slice read call. */ + grpc_error *error = hc_mutate_op(exec_ctx, elem, op); + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error); } else { - grpc_call_next_op(exec_ctx, elem, op); + call_data *calld = elem->call_data; + if (op->send_message && calld->send_message_blocked) { + /* Don't forward the op. send_message contains slices that aren't ready + yet. The call will be forwarded by the op_complete of slice read call. + */ + } else { + grpc_call_next_op(exec_ctx, elem, op); + } } GPR_TIMER_END("hc_start_transport_op", 0); } diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index a1ebc8810a..ac4cdcfa97 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -37,6 +37,7 @@ #include <grpc/support/log.h> #include <string.h> #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/security/util/b64.h" #include "src/core/lib/slice/percent_encoding.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -51,8 +52,8 @@ typedef struct call_data { grpc_linked_mdelem status; grpc_linked_mdelem content_type; - /* did this request come with payload-bin */ - bool seen_payload_bin; + /* did this request come with path query containing request payload */ + bool seen_path_with_query; /* flag to ensure payload_bin is delivered only once */ bool payload_bin_delivered; @@ -60,7 +61,7 @@ typedef struct call_data { uint32_t *recv_initial_metadata_flags; /** Closure to call when finished with the hs_on_recv hook */ grpc_closure *on_done_recv; - /** Closure to call when we retrieve read message from the payload-bin header + /** Closure to call when we retrieve read message from the path URI */ grpc_closure *recv_message_ready; grpc_closure *on_complete; @@ -100,7 +101,7 @@ static void add_error(const char *error_name, grpc_error **cumulative, grpc_error *new) { if (new == GRPC_ERROR_NONE) return; if (*cumulative == GRPC_ERROR_NONE) { - *cumulative = GRPC_ERROR_CREATE(error_name); + *cumulative = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_name); } *cumulative = grpc_error_add_child(*cumulative, new); } @@ -131,27 +132,32 @@ static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, ~GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; } else { add_error(error_name, &error, - grpc_attach_md_to_error(GRPC_ERROR_CREATE("Bad header"), - b->idx.named.method->md)); + grpc_attach_md_to_error( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"), + b->idx.named.method->md)); } grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.method); } else { - add_error(error_name, &error, - grpc_error_set_str(GRPC_ERROR_CREATE("Missing header"), - GRPC_ERROR_STR_KEY, ":method")); + add_error( + error_name, &error, + grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), + GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":method"))); } if (b->idx.named.te != NULL) { if (!grpc_mdelem_eq(b->idx.named.te->md, GRPC_MDELEM_TE_TRAILERS)) { add_error(error_name, &error, - grpc_attach_md_to_error(GRPC_ERROR_CREATE("Bad header"), - b->idx.named.te->md)); + grpc_attach_md_to_error( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"), + b->idx.named.te->md)); } grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.te); } else { add_error(error_name, &error, - grpc_error_set_str(GRPC_ERROR_CREATE("Missing header"), - GRPC_ERROR_STR_KEY, "te")); + grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), + GRPC_ERROR_STR_KEY, grpc_slice_from_static_string("te"))); } if (b->idx.named.scheme != NULL) { @@ -159,14 +165,17 @@ static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, !grpc_mdelem_eq(b->idx.named.scheme->md, GRPC_MDELEM_SCHEME_HTTPS) && !grpc_mdelem_eq(b->idx.named.scheme->md, GRPC_MDELEM_SCHEME_GRPC)) { add_error(error_name, &error, - grpc_attach_md_to_error(GRPC_ERROR_CREATE("Bad header"), - b->idx.named.scheme->md)); + grpc_attach_md_to_error( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"), + b->idx.named.scheme->md)); } grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.scheme); } else { - add_error(error_name, &error, - grpc_error_set_str(GRPC_ERROR_CREATE("Missing header"), - GRPC_ERROR_STR_KEY, ":scheme")); + add_error( + error_name, &error, + grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), + GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":scheme"))); } if (b->idx.named.content_type != NULL) { @@ -200,8 +209,46 @@ static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, if (b->idx.named.path == NULL) { add_error(error_name, &error, - grpc_error_set_str(GRPC_ERROR_CREATE("Missing header"), - GRPC_ERROR_STR_KEY, ":path")); + grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), + GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":path"))); + } else if (*calld->recv_cacheable_request == true) { + /* We have a cacheable request made with GET verb. The path contains the + * query parameter which is base64 encoded request payload. */ + const char k_query_separator = '?'; + grpc_slice path_slice = GRPC_MDVALUE(b->idx.named.path->md); + uint8_t *path_ptr = (uint8_t *)GRPC_SLICE_START_PTR(path_slice); + size_t path_length = GRPC_SLICE_LENGTH(path_slice); + /* offset of the character '?' */ + size_t offset = 0; + for (offset = 0; *path_ptr != k_query_separator && offset < path_length; + path_ptr++, offset++) + ; + if (offset < path_length) { + grpc_slice query_slice = + grpc_slice_sub(path_slice, offset + 1, path_length); + + /* substitute path metadata with just the path (not query) */ + grpc_mdelem mdelem_path_without_query = grpc_mdelem_from_slices( + exec_ctx, GRPC_MDSTR_PATH, grpc_slice_sub(path_slice, 0, offset)); + + grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path, + mdelem_path_without_query); + + /* decode payload from query and add to the slice buffer to be returned */ + const int k_url_safe = 1; + grpc_slice_buffer_add( + &calld->read_slice_buffer, + grpc_base64_decode(exec_ctx, + (const char *)GRPC_SLICE_START_PTR(query_slice), + k_url_safe)); + grpc_slice_buffer_stream_init(&calld->read_stream, + &calld->read_slice_buffer, 0); + calld->seen_path_with_query = true; + grpc_slice_unref_internal(exec_ctx, query_slice); + } else { + gpr_log(GPR_ERROR, "GET request without QUERY"); + } } if (b->idx.named.host != NULL && b->idx.named.authority == NULL) { @@ -218,19 +265,11 @@ static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, } if (b->idx.named.authority == NULL) { - add_error(error_name, &error, - grpc_error_set_str(GRPC_ERROR_CREATE("Missing header"), - GRPC_ERROR_STR_KEY, ":authority")); - } - - if (b->idx.named.grpc_payload_bin != NULL) { - calld->seen_payload_bin = true; - grpc_slice_buffer_add(&calld->read_slice_buffer, - grpc_slice_ref_internal( - GRPC_MDVALUE(b->idx.named.grpc_payload_bin->md))); - grpc_slice_buffer_stream_init(&calld->read_stream, - &calld->read_slice_buffer, 0); - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_payload_bin); + add_error( + error_name, &error, + grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), + GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":authority"))); } return error; @@ -253,8 +292,8 @@ static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, grpc_error *err) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; - /* Call recv_message_ready if we got the payload via the header field */ - if (calld->seen_payload_bin && calld->recv_message_ready != NULL) { + /* Call recv_message_ready if we got the payload via the path field */ + if (calld->seen_path_with_query && calld->recv_message_ready != NULL) { *calld->pp_recv_message = calld->payload_bin_delivered ? NULL : (grpc_byte_stream *)&calld->read_stream; @@ -269,7 +308,7 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data, grpc_error *err) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; - if (calld->seen_payload_bin) { + if (calld->seen_path_with_query) { /* do nothing. This is probably a GET request, and payload will be returned in hs_on_complete callback. */ } else { diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index c55b0971e9..0873d9c285 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -121,8 +121,8 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, "Received message larger than max (%u vs. %d)", (*calld->recv_message)->length, calld->max_recv_size); grpc_error* new_error = grpc_error_set_int( - GRPC_ERROR_CREATE(message_string), GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_INVALID_ARGUMENT); + GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INVALID_ARGUMENT); if (error == GRPC_ERROR_NONE) { error = new_error; } else { @@ -149,9 +149,10 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, op->payload->send_message.send_message->length, calld->max_send_size); grpc_transport_stream_op_finish_with_failure( - exec_ctx, op, grpc_error_set_int(GRPC_ERROR_CREATE(message_string), - GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_INVALID_ARGUMENT)); + exec_ctx, op, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_INVALID_ARGUMENT)); gpr_free(message_string); return; } diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 6d7aa43b81..453a64b049 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -126,13 +126,15 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, static void append_error(internal_request *req, grpc_error *error) { if (req->overall_error == GRPC_ERROR_NONE) { - req->overall_error = GRPC_ERROR_CREATE("Failed HTTP/1 client request"); + req->overall_error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed HTTP/1 client request"); } grpc_resolved_address *addr = &req->addresses->addrs[req->next_address - 1]; char *addr_text = grpc_sockaddr_to_uri(addr); req->overall_error = grpc_error_add_child( req->overall_error, - grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, addr_text)); + grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, + grpc_slice_from_copied_string(addr_text))); gpr_free(addr_text); } @@ -190,8 +192,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, internal_request *req = arg; if (!ep) { - next_address(exec_ctx, req, - GRPC_ERROR_CREATE("Unexplained handshake failure")); + next_address(exec_ctx, req, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Unexplained handshake failure")); return; } @@ -221,8 +223,8 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req, } if (req->next_address == req->addresses->naddrs) { finish(exec_ctx, req, - GRPC_ERROR_CREATE_REFERENCING("Failed HTTP requests to all targets", - &req->overall_error, 1)); + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Failed HTTP requests to all targets", &req->overall_error, 1)); return; } addr = &req->addresses->addrs[req->next_address++]; diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c index 354d2f4a09..be6a6d618a 100644 --- a/src/core/lib/http/httpcli_security_connector.c +++ b/src/core/lib/http/httpcli_security_connector.c @@ -95,7 +95,7 @@ static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx, char *msg; gpr_asprintf(&msg, "Peer name %s is not in peer certificate", c->secure_peer_name); - error = GRPC_ERROR_CREATE(msg); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); } grpc_closure_sched(exec_ctx, on_peer_checked, error); diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c index b9c56c103c..aac506b800 100644 --- a/src/core/lib/http/parser.c +++ b/src/core/lib/http/parser.c @@ -54,26 +54,36 @@ static grpc_error *handle_response_line(grpc_http_parser *parser) { uint8_t *cur = beg; uint8_t *end = beg + parser->cur_line_length; - if (cur == end || *cur++ != 'H') return GRPC_ERROR_CREATE("Expected 'H'"); - if (cur == end || *cur++ != 'T') return GRPC_ERROR_CREATE("Expected 'T'"); - if (cur == end || *cur++ != 'T') return GRPC_ERROR_CREATE("Expected 'T'"); - if (cur == end || *cur++ != 'P') return GRPC_ERROR_CREATE("Expected 'P'"); - if (cur == end || *cur++ != '/') return GRPC_ERROR_CREATE("Expected '/'"); - if (cur == end || *cur++ != '1') return GRPC_ERROR_CREATE("Expected '1'"); - if (cur == end || *cur++ != '.') return GRPC_ERROR_CREATE("Expected '.'"); + if (cur == end || *cur++ != 'H') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected 'H'"); + if (cur == end || *cur++ != 'T') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected 'T'"); + if (cur == end || *cur++ != 'T') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected 'T'"); + if (cur == end || *cur++ != 'P') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected 'P'"); + if (cur == end || *cur++ != '/') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected '/'"); + if (cur == end || *cur++ != '1') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected '1'"); + if (cur == end || *cur++ != '.') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected '.'"); if (cur == end || *cur < '0' || *cur++ > '1') { - return GRPC_ERROR_CREATE("Expected HTTP/1.0 or HTTP/1.1"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Expected HTTP/1.0 or HTTP/1.1"); } - if (cur == end || *cur++ != ' ') return GRPC_ERROR_CREATE("Expected ' '"); + if (cur == end || *cur++ != ' ') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected ' '"); if (cur == end || *cur < '1' || *cur++ > '9') - return GRPC_ERROR_CREATE("Expected status code"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected status code"); if (cur == end || *cur < '0' || *cur++ > '9') - return GRPC_ERROR_CREATE("Expected status code"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected status code"); if (cur == end || *cur < '0' || *cur++ > '9') - return GRPC_ERROR_CREATE("Expected status code"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected status code"); parser->http.response->status = (cur[-3] - '0') * 100 + (cur[-2] - '0') * 10 + (cur[-1] - '0'); - if (cur == end || *cur++ != ' ') return GRPC_ERROR_CREATE("Expected ' '"); + if (cur == end || *cur++ != ' ') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected ' '"); /* we don't really care about the status code message */ @@ -89,24 +99,33 @@ static grpc_error *handle_request_line(grpc_http_parser *parser) { while (cur != end && *cur++ != ' ') ; - if (cur == end) return GRPC_ERROR_CREATE("No method on HTTP request line"); + if (cur == end) + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "No method on HTTP request line"); parser->http.request->method = buf2str(beg, (size_t)(cur - beg - 1)); beg = cur; while (cur != end && *cur++ != ' ') ; - if (cur == end) return GRPC_ERROR_CREATE("No path on HTTP request line"); + if (cur == end) + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No path on HTTP request line"); parser->http.request->path = buf2str(beg, (size_t)(cur - beg - 1)); - if (cur == end || *cur++ != 'H') return GRPC_ERROR_CREATE("Expected 'H'"); - if (cur == end || *cur++ != 'T') return GRPC_ERROR_CREATE("Expected 'T'"); - if (cur == end || *cur++ != 'T') return GRPC_ERROR_CREATE("Expected 'T'"); - if (cur == end || *cur++ != 'P') return GRPC_ERROR_CREATE("Expected 'P'"); - if (cur == end || *cur++ != '/') return GRPC_ERROR_CREATE("Expected '/'"); + if (cur == end || *cur++ != 'H') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected 'H'"); + if (cur == end || *cur++ != 'T') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected 'T'"); + if (cur == end || *cur++ != 'T') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected 'T'"); + if (cur == end || *cur++ != 'P') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected 'P'"); + if (cur == end || *cur++ != '/') + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Expected '/'"); vers_major = (uint8_t)(*cur++ - '1' + 1); ++cur; if (cur == end) - return GRPC_ERROR_CREATE("End of line in HTTP version string"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "End of line in HTTP version string"); vers_minor = (uint8_t)(*cur++ - '1' + 1); if (vers_major == 1) { @@ -115,18 +134,19 @@ static grpc_error *handle_request_line(grpc_http_parser *parser) { } else if (vers_minor == 1) { parser->http.request->version = GRPC_HTTP_HTTP11; } else { - return GRPC_ERROR_CREATE( + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Expected one of HTTP/1.0, HTTP/1.1, or HTTP/2.0"); } } else if (vers_major == 2) { if (vers_minor == 0) { parser->http.request->version = GRPC_HTTP_HTTP20; } else { - return GRPC_ERROR_CREATE( + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Expected one of HTTP/1.0, HTTP/1.1, or HTTP/2.0"); } } else { - return GRPC_ERROR_CREATE("Expected one of HTTP/1.0, HTTP/1.1, or HTTP/2.0"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Expected one of HTTP/1.0, HTTP/1.1, or HTTP/2.0"); } return GRPC_ERROR_NONE; @@ -139,7 +159,8 @@ static grpc_error *handle_first_line(grpc_http_parser *parser) { case GRPC_HTTP_RESPONSE: return handle_response_line(parser); } - GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE("Should never reach here")); + GPR_UNREACHABLE_CODE( + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Should never reach here")); } static grpc_error *add_header(grpc_http_parser *parser) { @@ -154,7 +175,8 @@ static grpc_error *add_header(grpc_http_parser *parser) { GPR_ASSERT(cur != end); if (*cur == ' ' || *cur == '\t') { - error = GRPC_ERROR_CREATE("Continued header lines not supported yet"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Continued header lines not supported yet"); goto done; } @@ -162,7 +184,8 @@ static grpc_error *add_header(grpc_http_parser *parser) { cur++; } if (cur == end) { - error = GRPC_ERROR_CREATE("Didn't find ':' in header string"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Didn't find ':' in header string"); goto done; } GPR_ASSERT(cur >= beg); @@ -222,7 +245,8 @@ static grpc_error *finish_line(grpc_http_parser *parser, } break; case GRPC_HTTP_BODY: - GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE("Should never reach here")); + GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Should never reach here")); } parser->cur_line_length = 0; @@ -240,7 +264,8 @@ static grpc_error *addbyte_body(grpc_http_parser *parser, uint8_t byte) { body_length = &parser->http.request->body_length; body = &parser->http.request->body; } else { - GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE("Should never reach here")); + GPR_UNREACHABLE_CODE( + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Should never reach here")); } if (*body_length == parser->body_capacity) { @@ -286,7 +311,8 @@ static grpc_error *addbyte(grpc_http_parser *parser, uint8_t byte, if (grpc_http1_trace) gpr_log(GPR_ERROR, "HTTP header max line length (%d) exceeded", GRPC_HTTP_PARSER_MAX_HEADER_LENGTH); - return GRPC_ERROR_CREATE("HTTP header max line length exceeded"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "HTTP header max line length exceeded"); } parser->cur_line[parser->cur_line_length] = byte; parser->cur_line_length++; @@ -347,7 +373,7 @@ grpc_error *grpc_http_parser_parse(grpc_http_parser *parser, grpc_slice slice, grpc_error *grpc_http_parser_eof(grpc_http_parser *parser) { if (parser->state != GRPC_HTTP_BODY) { - return GRPC_ERROR_CREATE("Did not finish headers"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Did not finish headers"); } return GRPC_ERROR_NONE; } diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index 509c1ff95d..6633fb68ec 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -33,6 +33,7 @@ #include "src/core/lib/iomgr/closure.h" +#include <assert.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -124,6 +125,7 @@ void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *c, grpc_error *error) { GPR_TIMER_BEGIN("grpc_closure_run", 0); if (c != NULL) { + assert(c->cb); c->scheduler->vtable->run(exec_ctx, c, error); } else { GRPC_ERROR_UNREF(error); @@ -135,6 +137,7 @@ void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *c, grpc_error *error) { GPR_TIMER_BEGIN("grpc_closure_sched", 0); if (c != NULL) { + assert(c->cb); c->scheduler->vtable->sched(exec_ctx, c, error); } else { GRPC_ERROR_UNREF(error); @@ -146,6 +149,7 @@ void grpc_closure_list_sched(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) { grpc_closure *c = list->head; while (c != NULL) { grpc_closure *next = c->next_data.next; + assert(c->cb); c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error); c = next; } diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index fa9966c3a6..2bc476bbef 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -33,6 +33,7 @@ #include "src/core/lib/iomgr/combiner.h" +#include <assert.h> #include <string.h> #include <grpc/support/alloc.h> @@ -216,6 +217,7 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d last=%" PRIdPTR, lock, cl, covered_by_poller, last)); GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed + assert(cl->cb); cl->error_data.scratch = pack_error_data((error_data){error, covered_by_poller}); if (covered_by_poller) { diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index 1127fff756..1dbb64e8f3 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -35,7 +35,6 @@ #include <string.h> -#include <grpc/slice.h> #include <grpc/status.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -287,7 +286,7 @@ static void internal_add_error(grpc_error **err, grpc_error *new) { // It is very common to include and extra int and string in an error #define SURPLUS_CAPACITY (2 * SLOTS_PER_INT + SLOTS_PER_TIME) -grpc_error *grpc_error_create(const char *file, int line, const char *desc, +grpc_error *grpc_error_create(grpc_slice file, int line, grpc_slice desc, grpc_error **referencing, size_t num_referencing) { GPR_TIMER_BEGIN("grpc_error_create", 0); @@ -313,14 +312,8 @@ grpc_error *grpc_error_create(const char *file, int line, const char *desc, memset(err->times, UINT8_MAX, GRPC_ERROR_TIME_MAX); internal_set_int(&err, GRPC_ERROR_INT_FILE_LINE, line); - internal_set_str(&err, GRPC_ERROR_STR_FILE, - grpc_slice_from_static_string(file)); - internal_set_str( - &err, GRPC_ERROR_STR_DESCRIPTION, - grpc_slice_from_copied_buffer( - desc, - strlen(desc) + - 1)); // TODO, pull this up. // TODO(ncteisen), pull this up. + internal_set_str(&err, GRPC_ERROR_STR_FILE, file); + internal_set_str(&err, GRPC_ERROR_STR_DESCRIPTION, desc); for (size_t i = 0; i < num_referencing; ++i) { if (referencing[i] == GRPC_ERROR_NONE) continue; @@ -360,7 +353,7 @@ static grpc_error *copy_error_and_unref(grpc_error *in) { GPR_TIMER_BEGIN("copy_error_and_unref", 0); grpc_error *out; if (grpc_error_is_special(in)) { - out = GRPC_ERROR_CREATE("unknown"); + out = GRPC_ERROR_CREATE_FROM_STATIC_STRING("unknown"); if (in == GRPC_ERROR_NONE) { internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION, grpc_slice_from_static_string("no error")); @@ -417,7 +410,7 @@ typedef struct { const char *msg; } special_error_status_map; static special_error_status_map error_status_map[] = { - {GRPC_ERROR_NONE, GRPC_STATUS_OK, NULL}, + {GRPC_ERROR_NONE, GRPC_STATUS_OK, ""}, {GRPC_ERROR_CANCELLED, GRPC_STATUS_CANCELLED, "Cancelled"}, {GRPC_ERROR_OOM, GRPC_STATUS_RESOURCE_EXHAUSTED, "Out of memory"}, }; @@ -448,33 +441,33 @@ bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) { } grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which, - const char *value) { + grpc_slice str) { GPR_TIMER_BEGIN("grpc_error_set_str", 0); grpc_error *new = copy_error_and_unref(src); - internal_set_str(&new, which, - grpc_slice_from_copied_buffer( - value, strlen(value) + 1)); // TODO, pull this up. + internal_set_str(&new, which, str); GPR_TIMER_END("grpc_error_set_str", 0); return new; } -const char *grpc_error_get_str(grpc_error *err, grpc_error_strs which) { +bool grpc_error_get_str(grpc_error *err, grpc_error_strs which, + grpc_slice *str) { if (grpc_error_is_special(err)) { if (which == GRPC_ERROR_STR_GRPC_MESSAGE) { for (size_t i = 0; i < GPR_ARRAY_SIZE(error_status_map); i++) { if (error_status_map[i].error == err) { - return error_status_map[i].msg; + *str = grpc_slice_from_static_string(error_status_map[i].msg); + return true; } } } - return NULL; + return false; } uint8_t slot = err->strs[which]; if (slot != UINT8_MAX) { - return (const char *)GRPC_SLICE_START_PTR( - *(grpc_slice *)(err->arena + slot)); + *str = *(grpc_slice *)(err->arena + slot); + return true; } else { - return NULL; + return false; } } @@ -515,13 +508,14 @@ static void append_str(const char *str, char **s, size_t *sz, size_t *cap) { } } -static void append_esc_str(const char *str, char **s, size_t *sz, size_t *cap) { +static void append_esc_str(const uint8_t *str, size_t len, char **s, size_t *sz, + size_t *cap) { static const char *hex = "0123456789abcdef"; append_chr('"', s, sz, cap); - for (const uint8_t *c = (const uint8_t *)str; *c; c++) { - if (*c < 32 || *c >= 127) { + for (size_t i = 0; i < len; i++, str++) { + if (*str < 32 || *str >= 127) { append_chr('\\', s, sz, cap); - switch (*c) { + switch (*str) { case '\b': append_chr('b', s, sz, cap); break; @@ -541,12 +535,12 @@ static void append_esc_str(const char *str, char **s, size_t *sz, size_t *cap) { append_chr('u', s, sz, cap); append_chr('0', s, sz, cap); append_chr('0', s, sz, cap); - append_chr(hex[*c >> 4], s, sz, cap); - append_chr(hex[*c & 0x0f], s, sz, cap); + append_chr(hex[*str >> 4], s, sz, cap); + append_chr(hex[*str & 0x0f], s, sz, cap); break; } } else { - append_chr((char)*c, s, sz, cap); + append_chr((char)*str, s, sz, cap); } } append_chr('"', s, sz, cap); @@ -586,11 +580,12 @@ static char *key_str(grpc_error_strs which) { return gpr_strdup(error_str_name(which)); } -static char *fmt_str(void *p) { +static char *fmt_str(grpc_slice slice) { char *s = NULL; size_t sz = 0; size_t cap = 0; - append_esc_str(p, &s, &sz, &cap); + append_esc_str((const uint8_t *)GRPC_SLICE_START_PTR(slice), + GRPC_SLICE_LENGTH(slice), &s, &sz, &cap); append_chr(0, &s, &sz, &cap); return s; } @@ -599,9 +594,8 @@ static void collect_strs_kvs(grpc_error *err, kv_pairs *kvs) { for (size_t which = 0; which < GRPC_ERROR_STR_MAX; ++which) { uint8_t slot = err->strs[which]; if (slot != UINT8_MAX) { - append_kv( - kvs, key_str((grpc_error_strs)which), - fmt_str(GRPC_SLICE_START_PTR(*(grpc_slice *)(err->arena + slot)))); + append_kv(kvs, key_str((grpc_error_strs)which), + fmt_str(*(grpc_slice *)(err->arena + slot))); } } } @@ -681,7 +675,8 @@ static char *finish_kvs(kv_pairs *kvs) { append_chr('{', &s, &sz, &cap); for (size_t i = 0; i < kvs->num_kvs; i++) { if (i != 0) append_chr(',', &s, &sz, &cap); - append_esc_str(kvs->kvs[i].key, &s, &sz, &cap); + append_esc_str((const uint8_t *)kvs->kvs[i].key, strlen(kvs->kvs[i].key), + &s, &sz, &cap); gpr_free(kvs->kvs[i].key); append_chr(':', &s, &sz, &cap); append_str(kvs->kvs[i].value, &s, &sz, &cap); @@ -733,10 +728,14 @@ grpc_error *grpc_os_error(const char *file, int line, int err, const char *call_name) { return grpc_error_set_str( grpc_error_set_str( - grpc_error_set_int(grpc_error_create(file, line, "OS Error", NULL, 0), - GRPC_ERROR_INT_ERRNO, err), - GRPC_ERROR_STR_OS_ERROR, strerror(err)), - GRPC_ERROR_STR_SYSCALL, call_name); + grpc_error_set_int( + grpc_error_create(grpc_slice_from_static_string(file), line, + grpc_slice_from_static_string("OS Error"), NULL, + 0), + GRPC_ERROR_INT_ERRNO, err), + GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(strerror(err))), + GRPC_ERROR_STR_SYSCALL, grpc_slice_from_static_string(call_name)); } #ifdef GPR_WINDOWS @@ -745,10 +744,13 @@ grpc_error *grpc_wsa_error(const char *file, int line, int err, char *utf8_message = gpr_format_message(err); grpc_error *error = grpc_error_set_str( grpc_error_set_str( - grpc_error_set_int(grpc_error_create(file, line, "OS Error", NULL, 0), - GRPC_ERROR_INT_WSA_ERROR, err), - GRPC_ERROR_STR_OS_ERROR, utf8_message), - GRPC_ERROR_STR_SYSCALL, call_name); + grpc_error_set_int( + grpc_error_create(grpc_slice_from_static_string(file), line, + grpc_slice_from_static_string("OS Error"), NULL, + 0), + GRPC_ERROR_INT_WSA_ERROR, err), + GRPC_ERROR_STR_OS_ERROR, grpc_slice_from_copied_string(utf8_message)), + GRPC_ERROR_STR_SYSCALL, grpc_slice_from_static_string(call_name)); gpr_free(utf8_message); return error; } diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index eb953947ae..2a44fcfe25 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -37,6 +37,7 @@ #include <stdbool.h> #include <stdint.h> +#include <grpc/slice.h> #include <grpc/status.h> #include <grpc/support/time.h> @@ -45,28 +46,9 @@ extern "C" { #endif /// Opaque representation of an error. -/// Errors are refcounted objects that represent the result of an operation. -/// Ownership laws: -/// if a grpc_error is returned by a function, the caller owns a ref to that -/// instance -/// if a grpc_error is passed to a grpc_closure callback function (functions -/// with the signature: -/// void (*f)(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error)) -/// then those functions do not own a ref to error (but are free to manually -/// take a reference). -/// if a grpc_error is passed to *ANY OTHER FUNCTION* then that function takes -/// ownership of the error -/// Errors have: -/// a set of ints, strings, and timestamps that describe the error -/// always present are: -/// GRPC_ERROR_STR_FILE, GRPC_ERROR_INT_FILE_LINE - source location the error -/// was generated -/// GRPC_ERROR_STR_DESCRIPTION - a human readable description of the error -/// GRPC_ERROR_TIME_CREATED - a timestamp indicating when the error happened -/// an error can also have children; these are other errors that are believed -/// to have contributed to this one. By accumulating children, we can begin -/// to root cause high level failures from low level failures, without having -/// to derive execution paths from log lines +/// See https://github.com/grpc/grpc/blob/master/doc/core/grpc-error.md for a +/// full write up of this object. + typedef struct grpc_error grpc_error; typedef enum { @@ -156,7 +138,7 @@ typedef enum { const char *grpc_error_string(grpc_error *error); /// Create an error - but use GRPC_ERROR_CREATE instead -grpc_error *grpc_error_create(const char *file, int line, const char *desc, +grpc_error *grpc_error_create(grpc_slice file, int line, grpc_slice desc, grpc_error **referencing, size_t num_referencing); /// Create an error (this is the preferred way of generating an error that is /// not due to a system call - for system calls, use GRPC_OS_ERROR or @@ -166,13 +148,21 @@ grpc_error *grpc_error_create(const char *file, int line, const char *desc, /// err = grpc_error_create(x, y, z, r, nr) is equivalent to: /// err = grpc_error_create(x, y, z, NULL, 0); /// for (i=0; i<nr; i++) err = grpc_error_add_child(err, r[i]); -#define GRPC_ERROR_CREATE(desc) \ - grpc_error_create(__FILE__, __LINE__, desc, NULL, 0) +#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc) \ + grpc_error_create(grpc_slice_from_static_string(__FILE__), __LINE__, \ + grpc_slice_from_static_string(desc), NULL, 0) +#define GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc) \ + grpc_error_create(grpc_slice_from_static_string(__FILE__), __LINE__, \ + grpc_slice_from_copied_string(desc), NULL, 0) // Create an error that references some other errors. This function adds a // reference to each error in errs - it does not consume an existing reference -#define GRPC_ERROR_CREATE_REFERENCING(desc, errs, count) \ - grpc_error_create(__FILE__, __LINE__, desc, errs, count) +#define GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(desc, errs, count) \ + grpc_error_create(grpc_slice_from_static_string(__FILE__), __LINE__, \ + grpc_slice_from_static_string(desc), errs, count) +#define GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(desc, errs, count) \ + grpc_error_create(grpc_slice_from_static_string(__FILE__), __LINE__, \ + grpc_slice_from_copied_string(desc), errs, count) //#define GRPC_ERROR_REFCOUNT_DEBUG #ifdef GRPC_ERROR_REFCOUNT_DEBUG @@ -194,10 +184,11 @@ grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which, intptr_t value) GRPC_MUST_USE_RESULT; bool grpc_error_get_int(grpc_error *error, grpc_error_ints which, intptr_t *p); grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which, - const char *value) GRPC_MUST_USE_RESULT; -/// Returns NULL if the specified string is not set. -/// Caller does NOT own return value. -const char *grpc_error_get_str(grpc_error *error, grpc_error_strs which); + grpc_slice str) GRPC_MUST_USE_RESULT; +/// Returns false if the specified string is not set. +/// Caller does NOT own the slice. +bool grpc_error_get_str(grpc_error *error, grpc_error_strs which, + grpc_slice *s); /// Add a child error: an error that is believed to have contributed to this /// error occurring. Allows root causing high level errors from lower level diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 11208b9ad1..1924e76f13 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -321,7 +321,7 @@ static bool append_error(grpc_error **composite, grpc_error *error, const char *desc) { if (error == GRPC_ERROR_NONE) return true; if (*composite == GRPC_ERROR_NONE) { - *composite = GRPC_ERROR_CREATE(desc); + *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc); } *composite = grpc_error_add_child(*composite, error); return false; @@ -1146,9 +1146,9 @@ static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, schedule the closure with the shutdown error */ if ((curr & FD_SHUTDOWN_BIT) > 0) { grpc_error *shutdown_err = (grpc_error *)(curr & ~FD_SHUTDOWN_BIT); - grpc_closure_sched( - exec_ctx, closure, - GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1)); + grpc_closure_sched(exec_ctx, closure, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "FD Shutdown", &shutdown_err, 1)); return; } @@ -1203,9 +1203,9 @@ static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, notify_on to ensure that the closure it schedules 'happens-after' the set_shutdown is called on the fd */ if (gpr_atm_rel_cas(state, curr, new_state)) { - grpc_closure_sched( - exec_ctx, (grpc_closure *)curr, - GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1)); + grpc_closure_sched(exec_ctx, (grpc_closure *)curr, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "FD Shutdown", &shutdown_err, 1)); return; } diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 5ddd5313e2..ca6e855611 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -451,14 +451,16 @@ static grpc_error *fd_shutdown_error(grpc_fd *fd) { if (!fd->shutdown) { return GRPC_ERROR_NONE; } else { - return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1); + return GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "FD shutdown", &fd->shutdown_error, 1); } } static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, grpc_closure *closure) { if (fd->shutdown) { - grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown")); + grpc_closure_sched(exec_ctx, closure, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown")); } else if (*st == CLOSURE_NOT_READY) { /* not ready ==> switch to a waiting state by setting the closure */ *st = closure; @@ -696,7 +698,7 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { static void kick_append_error(grpc_error **composite, grpc_error *error) { if (error == GRPC_ERROR_NONE) return; if (*composite == GRPC_ERROR_NONE) { - *composite = GRPC_ERROR_CREATE("Kick Failure"); + *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Kick Failure"); } *composite = grpc_error_add_child(*composite, error); } @@ -859,7 +861,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { static void work_combine_error(grpc_error **composite, grpc_error *error) { if (error == GRPC_ERROR_NONE) return; if (*composite == GRPC_ERROR_NONE) { - *composite = GRPC_ERROR_CREATE("pollset_work"); + *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("pollset_work"); } *composite = grpc_error_add_child(*composite, error); } @@ -1421,7 +1423,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { g_cvfds.pollcount++; opt = gpr_thd_options_default(); gpr_thd_options_set_detached(&opt); - gpr_thd_new(&t_id, &run_poll, pargs, &opt); + GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt)); // We want the poll() thread to trigger the deadline, so wait forever here gpr_cv_wait(pollcv, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); if (gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) { diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index a5b62aa888..ae3e2eabc3 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -115,8 +115,8 @@ static void maybe_spawn_locked() { /* All previous instances of the thread should have been joined at this point. * Spawn time! */ g_executor.busy = 1; - gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL, - &g_executor.options); + GPR_ASSERT(gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL, + &g_executor.options)); g_executor.pending_join = 1; } diff --git a/src/core/lib/iomgr/load_file.c b/src/core/lib/iomgr/load_file.c index f40c8b28cc..208f74e20c 100644 --- a/src/core/lib/iomgr/load_file.c +++ b/src/core/lib/iomgr/load_file.c @@ -78,9 +78,12 @@ end: *output = result; if (file != NULL) fclose(file); if (error != GRPC_ERROR_NONE) { - grpc_error *error_out = grpc_error_set_str( - GRPC_ERROR_CREATE_REFERENCING("Failed to load file", &error, 1), - GRPC_ERROR_STR_FILENAME, filename); + grpc_error *error_out = + grpc_error_set_str(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Failed to load file", &error, 1), + GRPC_ERROR_STR_FILENAME, + grpc_slice_from_copied_string( + filename)); // TODO(ncteisen), always static? GRPC_ERROR_UNREF(error); error = error_out; } diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index 50e470d149..d0ede0f2d5 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -73,14 +73,16 @@ static grpc_error *blocking_resolve_address_impl( /* parse name, splitting it into host and port parts */ gpr_split_host_port(name, &host, &port); if (host == NULL) { - err = grpc_error_set_str(GRPC_ERROR_CREATE("unparseable host:port"), - GRPC_ERROR_STR_TARGET_ADDRESS, name); + err = grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("unparseable host:port"), + GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); goto done; } if (port == NULL) { if (default_port == NULL) { - err = grpc_error_set_str(GRPC_ERROR_CREATE("no port in name"), - GRPC_ERROR_STR_TARGET_ADDRESS, name); + err = grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("no port in name"), + GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); goto done; } port = gpr_strdup(default_port); @@ -112,11 +114,15 @@ static grpc_error *blocking_resolve_address_impl( if (s != 0) { err = grpc_error_set_str( grpc_error_set_str( - grpc_error_set_str(grpc_error_set_int(GRPC_ERROR_CREATE("OS Error"), - GRPC_ERROR_INT_ERRNO, s), - GRPC_ERROR_STR_OS_ERROR, gai_strerror(s)), - GRPC_ERROR_STR_SYSCALL, "getaddrinfo"), - GRPC_ERROR_STR_TARGET_ADDRESS, name); + grpc_error_set_str( + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("OS Error"), + GRPC_ERROR_INT_ERRNO, s), + GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(gai_strerror(s))), + GRPC_ERROR_STR_SYSCALL, + grpc_slice_from_static_string("getaddrinfo")), + GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); goto done; } diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index 4d715be94c..102d1aa290 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -92,9 +92,10 @@ static grpc_error *handle_addrinfo_result(int status, struct addrinfo *result, if (status != 0) { grpc_error *error; *addresses = NULL; - error = GRPC_ERROR_CREATE("getaddrinfo failed"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("getaddrinfo failed"); error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); return error; } (*addresses) = gpr_malloc(sizeof(grpc_resolved_addresses)); @@ -153,7 +154,7 @@ static grpc_error *try_split_host_port(const char *name, if (*host == NULL) { char *msg; gpr_asprintf(&msg, "unparseable host:port: '%s'", name); - error = GRPC_ERROR_CREATE(msg); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); return error; } @@ -162,7 +163,7 @@ static grpc_error *try_split_host_port(const char *name, if (default_port == NULL) { char *msg; gpr_asprintf(&msg, "no port in name '%s'", name); - error = GRPC_ERROR_CREATE(msg); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); return error; } @@ -262,8 +263,9 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, if (s != 0) { *addrs = NULL; - err = GRPC_ERROR_CREATE("getaddrinfo failed"); - err = grpc_error_set_str(err, GRPC_ERROR_STR_OS_ERROR, uv_strerror(s)); + err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("getaddrinfo failed"); + err = grpc_error_set_str(err, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(s))); grpc_closure_sched(exec_ctx, on_done, err); gpr_free(r); gpr_free(req); diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index 2439ce3cb7..22eca1fd3b 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -78,7 +78,7 @@ static grpc_error *blocking_resolve_address_impl( if (host == NULL) { char *msg; gpr_asprintf(&msg, "unparseable host:port: '%s'", name); - error = GRPC_ERROR_CREATE(msg); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); goto done; } @@ -86,7 +86,7 @@ static grpc_error *blocking_resolve_address_impl( if (default_port == NULL) { char *msg; gpr_asprintf(&msg, "no port in name '%s'", name); - error = GRPC_ERROR_CREATE(msg); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); goto done; } diff --git a/src/core/lib/iomgr/socket_factory_posix.c b/src/core/lib/iomgr/socket_factory_posix.c new file mode 100644 index 0000000000..1050a14c46 --- /dev/null +++ b/src/core/lib/iomgr/socket_factory_posix.c @@ -0,0 +1,110 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_POSIX_SOCKET + +#include "src/core/lib/iomgr/socket_factory_posix.h" + +#include <grpc/impl/codegen/grpc_types.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> + +void grpc_socket_factory_init(grpc_socket_factory *factory, + const grpc_socket_factory_vtable *vtable) { + factory->vtable = vtable; + gpr_ref_init(&factory->refcount, 1); +} + +int grpc_socket_factory_socket(grpc_socket_factory *factory, int domain, + int type, int protocol) { + return factory->vtable->socket(factory, domain, type, protocol); +} + +int grpc_socket_factory_bind(grpc_socket_factory *factory, int sockfd, + const grpc_resolved_address *addr) { + return factory->vtable->bind(factory, sockfd, addr); +} + +int grpc_socket_factory_compare(grpc_socket_factory *a, + grpc_socket_factory *b) { + int c = GPR_ICMP(a, b); + if (c != 0) { + grpc_socket_factory *sma = a; + grpc_socket_factory *smb = b; + c = GPR_ICMP(sma->vtable, smb->vtable); + if (c == 0) { + c = sma->vtable->compare(sma, smb); + } + } + return c; +} + +grpc_socket_factory *grpc_socket_factory_ref(grpc_socket_factory *factory) { + gpr_ref(&factory->refcount); + return factory; +} + +void grpc_socket_factory_unref(grpc_socket_factory *factory) { + if (gpr_unref(&factory->refcount)) { + factory->vtable->destroy(factory); + } +} + +static void *socket_factory_arg_copy(void *p) { + return grpc_socket_factory_ref(p); +} + +static void socket_factory_arg_destroy(grpc_exec_ctx *exec_ctx, void *p) { + grpc_socket_factory_unref(p); +} + +static int socket_factory_cmp(void *a, void *b) { + return grpc_socket_factory_compare((grpc_socket_factory *)a, + (grpc_socket_factory *)b); +} + +static const grpc_arg_pointer_vtable socket_factory_arg_vtable = { + socket_factory_arg_copy, socket_factory_arg_destroy, socket_factory_cmp}; + +grpc_arg grpc_socket_factory_to_arg(grpc_socket_factory *factory) { + grpc_arg arg; + arg.type = GRPC_ARG_POINTER; + arg.key = GRPC_ARG_SOCKET_FACTORY; + arg.value.pointer.vtable = &socket_factory_arg_vtable; + arg.value.pointer.p = factory; + return arg; +} + +#endif diff --git a/src/core/lib/iomgr/socket_factory_posix.h b/src/core/lib/iomgr/socket_factory_posix.h new file mode 100644 index 0000000000..2c63299030 --- /dev/null +++ b/src/core/lib/iomgr/socket_factory_posix.h @@ -0,0 +1,90 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_SOCKET_FACTORY_POSIX_H +#define GRPC_CORE_LIB_IOMGR_SOCKET_FACTORY_POSIX_H + +#include <grpc/impl/codegen/grpc_types.h> +#include <grpc/support/sync.h> +#include "src/core/lib/iomgr/resolve_address.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** The virtual table of grpc_socket_factory */ +typedef struct { + /** Replacement for socket(2) */ + int (*socket)(grpc_socket_factory *factory, int domain, int type, + int protocol); + /** Replacement for bind(2) */ + int (*bind)(grpc_socket_factory *factory, int sockfd, + const grpc_resolved_address *addr); + /** Compare socket factory \a a and \a b */ + int (*compare)(grpc_socket_factory *a, grpc_socket_factory *b); + /** Destroys the socket factory instance */ + void (*destroy)(grpc_socket_factory *factory); +} grpc_socket_factory_vtable; + +/** The Socket Factory interface allows changes on socket options */ +struct grpc_socket_factory { + const grpc_socket_factory_vtable *vtable; + gpr_refcount refcount; +}; + +/** called by concrete implementations to initialize the base struct */ +void grpc_socket_factory_init(grpc_socket_factory *factory, + const grpc_socket_factory_vtable *vtable); + +/** Wrap \a factory as a grpc_arg */ +grpc_arg grpc_socket_factory_to_arg(grpc_socket_factory *factory); + +/** Perform the equivalent of a socket(2) operation using \a factory */ +int grpc_socket_factory_socket(grpc_socket_factory *factory, int domain, + int type, int protocol); + +/** Perform the equivalent of a bind(2) operation using \a factory */ +int grpc_socket_factory_bind(grpc_socket_factory *factory, int sockfd, + const grpc_resolved_address *addr); + +/** Compare if \a a and \a b are the same factory or have same settings */ +int grpc_socket_factory_compare(grpc_socket_factory *a, grpc_socket_factory *b); + +grpc_socket_factory *grpc_socket_factory_ref(grpc_socket_factory *factory); +void grpc_socket_factory_unref(grpc_socket_factory *factory); + +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_FACTORY_POSIX_H */ diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c index 88e9ade253..bbe642d0fb 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.c +++ b/src/core/lib/iomgr/socket_utils_common_posix.c @@ -90,7 +90,7 @@ grpc_error *grpc_set_socket_no_sigpipe_if_possible(int fd) { return GRPC_OS_ERROR(errno, "getsockopt(SO_NOSIGPIPE)"); } if ((newval != 0) != (val != 0)) { - return GRPC_ERROR_CREATE("Failed to set SO_NOSIGPIPE"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to set SO_NOSIGPIPE"); } #endif return GRPC_ERROR_NONE; @@ -164,7 +164,7 @@ grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse) { return GRPC_OS_ERROR(errno, "getsockopt(SO_REUSEADDR)"); } if ((newval != 0) != val) { - return GRPC_ERROR_CREATE("Failed to set SO_REUSEADDR"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to set SO_REUSEADDR"); } return GRPC_ERROR_NONE; @@ -173,7 +173,8 @@ grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse) { /* set a socket to reuse old addresses */ grpc_error *grpc_set_socket_reuse_port(int fd, int reuse) { #ifndef SO_REUSEPORT - return GRPC_ERROR_CREATE("SO_REUSEPORT unavailable on compiling system"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "SO_REUSEPORT unavailable on compiling system"); #else int val = (reuse != 0); int newval; @@ -185,7 +186,7 @@ grpc_error *grpc_set_socket_reuse_port(int fd, int reuse) { return GRPC_OS_ERROR(errno, "getsockopt(SO_REUSEPORT)"); } if ((newval != 0) != val) { - return GRPC_ERROR_CREATE("Failed to set SO_REUSEPORT"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to set SO_REUSEPORT"); } return GRPC_ERROR_NONE; @@ -204,7 +205,7 @@ grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) { return GRPC_OS_ERROR(errno, "getsockopt(TCP_NODELAY)"); } if ((newval != 0) != val) { - return GRPC_ERROR_CREATE("Failed to set TCP_NODELAY"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to set TCP_NODELAY"); } return GRPC_ERROR_NONE; } @@ -213,7 +214,7 @@ grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) { grpc_error *grpc_set_socket_with_mutator(int fd, grpc_socket_mutator *mutator) { GPR_ASSERT(mutator); if (!grpc_socket_mutator_mutate_fd(mutator, fd)) { - return GRPC_ERROR_CREATE("grpc_socket_mutator failed."); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("grpc_socket_mutator failed."); } return GRPC_ERROR_NONE; } @@ -268,7 +269,8 @@ static grpc_error *error_for_fd(int fd, const grpc_resolved_address *addr) { char *addr_str; grpc_sockaddr_to_string(&addr_str, addr, 0); grpc_error *err = grpc_error_set_str(GRPC_OS_ERROR(errno, "socket"), - GRPC_ERROR_STR_TARGET_ADDRESS, addr_str); + GRPC_ERROR_STR_TARGET_ADDRESS, + grpc_slice_from_copied_string(addr_str)); gpr_free(addr_str); return err; } @@ -276,11 +278,25 @@ static grpc_error *error_for_fd(int fd, const grpc_resolved_address *addr) { grpc_error *grpc_create_dualstack_socket( const grpc_resolved_address *resolved_addr, int type, int protocol, grpc_dualstack_mode *dsmode, int *newfd) { + return grpc_create_dualstack_socket_using_factory(NULL, resolved_addr, type, + protocol, dsmode, newfd); +} + +static int create_socket(grpc_socket_factory *factory, int domain, int type, + int protocol) { + return (factory != NULL) + ? grpc_socket_factory_socket(factory, domain, type, protocol) + : socket(domain, type, protocol); +} + +grpc_error *grpc_create_dualstack_socket_using_factory( + grpc_socket_factory *factory, const grpc_resolved_address *resolved_addr, + int type, int protocol, grpc_dualstack_mode *dsmode, int *newfd) { const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; int family = addr->sa_family; if (family == AF_INET6) { if (grpc_ipv6_loopback_available()) { - *newfd = socket(family, type, protocol); + *newfd = create_socket(factory, family, type, protocol); } else { *newfd = -1; errno = EAFNOSUPPORT; @@ -302,7 +318,7 @@ grpc_error *grpc_create_dualstack_socket( family = AF_INET; } *dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE; - *newfd = socket(family, type, protocol); + *newfd = create_socket(factory, family, type, protocol); return error_for_fd(*newfd, resolved_addr); } diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index e84d3781a1..2c2fc95ff9 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -41,6 +41,7 @@ #include <grpc/impl/codegen/grpc_types.h> #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/socket_factory_posix.h" #include "src/core/lib/iomgr/socket_mutator.h" /* a wrapper for accept or accept4 */ @@ -137,4 +138,10 @@ grpc_error *grpc_create_dualstack_socket(const grpc_resolved_address *addr, grpc_dualstack_mode *dsmode, int *newfd); +/* Same as grpc_create_dualstack_socket(), but use the given socket factory (if + non-null) to create the socket, rather than calling socket() directly. */ +grpc_error *grpc_create_dualstack_socket_using_factory( + grpc_socket_factory *factory, const grpc_resolved_address *addr, int type, + int protocol, grpc_dualstack_mode *dsmode, int *newfd); + #endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H */ diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 0144192b71..a108b10da6 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -121,8 +121,8 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { } gpr_mu_lock(&ac->mu); if (ac->fd != NULL) { - grpc_fd_shutdown(exec_ctx, ac->fd, - GRPC_ERROR_CREATE("connect() timed out")); + grpc_fd_shutdown(exec_ctx, ac->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "connect() timed out")); } done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); @@ -191,7 +191,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { gpr_mu_lock(&ac->mu); if (error != GRPC_ERROR_NONE) { error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, "Timeout occurred"); + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string("Timeout occurred")); goto finish; } @@ -252,12 +253,17 @@ finish: gpr_mu_unlock(&ac->mu); if (error != GRPC_ERROR_NONE) { char *error_descr; - gpr_asprintf(&error_descr, "Failed to connect to remote host: %s", - grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION)); - error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION, error_descr); + grpc_slice str; + bool ret = grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &str); + GPR_ASSERT(ret); + char *desc = grpc_slice_to_c_string(str); + gpr_asprintf(&error_descr, "Failed to connect to remote host: %s", desc); + error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION, + grpc_slice_from_copied_string(error_descr)); gpr_free(error_descr); - error = - grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, ac->addr_str); + gpr_free(desc); + error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, + grpc_slice_from_copied_string(ac->addr_str)); } if (done) { gpr_mu_destroy(&ac->mu); diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index 618483d9cb..682c24ed56 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -100,17 +100,21 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) { *connect->endpoint = grpc_tcp_create( connect->tcp_handle, connect->resource_quota, connect->addr_name); } else { - error = GRPC_ERROR_CREATE("Failed to connect to remote host"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to connect to remote host"); error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, -status); error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); if (status == UV_ECANCELED) { - error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, - "Timeout occurred"); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string("Timeout occurred")); // This should only happen if the handle is already closed } else { - error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, - uv_strerror(status)); + error = grpc_error_set_str( + error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); uv_close((uv_handle_t *)connect->tcp_handle, tcp_close_callback); } } diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index c8dc9e64bd..a356564766 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -123,7 +123,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { socket = NULL; } } else { - error = GRPC_ERROR_CREATE("socket is null"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("socket is null"); } } @@ -238,8 +238,9 @@ failure: GPR_ASSERT(error != GRPC_ERROR_NONE); char *target_uri = grpc_sockaddr_to_uri(addr); grpc_error *final_error = grpc_error_set_str( - GRPC_ERROR_CREATE_REFERENCING("Failed to connect", &error, 1), - GRPC_ERROR_STR_TARGET_ADDRESS, target_uri); + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Failed to connect", + &error, 1), + GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(target_uri)); GRPC_ERROR_UNREF(error); if (socket != NULL) { grpc_winsocket_destroy(socket); diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index a4381f8fc9..4d7cf3ff51 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -111,7 +111,8 @@ typedef struct { static grpc_error *tcp_annotate_error(grpc_error *src_error, grpc_tcp *tcp) { return grpc_error_set_str( grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd), - GRPC_ERROR_STR_TARGET_ADDRESS, tcp->peer_string); + GRPC_ERROR_STR_TARGET_ADDRESS, + grpc_slice_from_copied_string(tcp->peer_string)); } static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, @@ -246,8 +247,10 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { } else if (read_bytes == 0) { /* 0 read size ==> end of stream */ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer); - call_read_cb(exec_ctx, tcp, - tcp_annotate_error(GRPC_ERROR_CREATE("Socket closed"), tcp)); + call_read_cb( + exec_ctx, tcp, + tcp_annotate_error( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp)); TCP_UNREF(exec_ctx, tcp, "read"); } else { GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); @@ -464,10 +467,12 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (buf->length == 0) { GPR_TIMER_END("tcp_write", 0); - grpc_closure_sched(exec_ctx, cb, - grpc_fd_is_shutdown(tcp->em_fd) - ? tcp_annotate_error(GRPC_ERROR_CREATE("EOF"), tcp) - : GRPC_ERROR_NONE); + grpc_closure_sched( + exec_ctx, cb, + grpc_fd_is_shutdown(tcp->em_fd) + ? tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), + tcp) + : GRPC_ERROR_NONE); return; } tcp->outgoing_buffer = buf; diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index b3644518f5..d6a017cf7f 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -100,8 +100,8 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, } else { grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); - return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT - " must be an integer"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING(GRPC_ARG_ALLOW_REUSEPORT + " must be an integer"); } } else if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_POINTER) { @@ -111,8 +111,8 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, } else { grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); - return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA - " must be a pointer to a buffer pool"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool"); } } else if (0 == strcmp(GRPC_ARG_EXPAND_WILDCARD_ADDRS, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_INTEGER) { @@ -120,8 +120,8 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, } else { grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); - return GRPC_ERROR_CREATE(GRPC_ARG_EXPAND_WILDCARD_ADDRS - " must be an integer"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + GRPC_ARG_EXPAND_WILDCARD_ADDRS " must be an integer"); } } } @@ -185,10 +185,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { /* delete ALL the things */ gpr_mu_lock(&s->mu); - if (!s->shutdown) { - gpr_mu_unlock(&s->mu); - return; - } + GPR_ASSERT(s->shutdown); if (s->head) { grpc_tcp_listener *sp; @@ -216,8 +213,8 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->active_ports) { grpc_tcp_listener *sp; for (sp = s->head; sp; sp = sp->next) { - grpc_fd_shutdown(exec_ctx, sp->emfd, - GRPC_ERROR_CREATE("Server destroyed")); + grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Server destroyed")); } gpr_mu_unlock(&s->mu); } else { @@ -301,7 +298,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { error: gpr_mu_lock(&sp->server->mu); - if (0 == --sp->server->active_ports) { + if (0 == --sp->server->active_ports && sp->server->shutdown) { gpr_mu_unlock(&sp->server->mu); deactivated_all_ports(exec_ctx, sp->server); } else { @@ -350,12 +347,24 @@ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s, } } if (*out_port > 0) { - GRPC_LOG_IF_ERROR("Failed to add :: listener", v6_err); - GRPC_LOG_IF_ERROR("Failed to add 0.0.0.0 listener", v4_err); + if (v6_err != GRPC_ERROR_NONE) { + gpr_log(GPR_INFO, + "Failed to add :: listener, " + "the environment may not support IPv6: %s", + grpc_error_string(v6_err)); + GRPC_ERROR_UNREF(v6_err); + } + if (v4_err != GRPC_ERROR_NONE) { + gpr_log(GPR_INFO, + "Failed to add 0.0.0.0 listener, " + "the environment may not support IPv4: %s", + grpc_error_string(v4_err)); + GRPC_ERROR_UNREF(v4_err); + } return GRPC_ERROR_NONE; } else { - grpc_error *root_err = - GRPC_ERROR_CREATE("Failed to add any wildcard listeners"); + grpc_error *root_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to add any wildcard listeners"); GPR_ASSERT(v6_err != GRPC_ERROR_NONE && v4_err != GRPC_ERROR_NONE); root_err = grpc_error_add_child(root_err, v6_err); root_err = grpc_error_add_child(root_err, v4_err); @@ -575,7 +584,7 @@ void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *sp; for (sp = s->head; sp; sp = sp->next) { grpc_fd_shutdown(exec_ctx, sp->emfd, - GRPC_ERROR_CREATE("Server shutdown")); + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown")); } } gpr_mu_unlock(&s->mu); diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.c b/src/core/lib/iomgr/tcp_server_utils_posix_common.c index e45e27d5ab..af2b00b4b5 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.c +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.c @@ -33,7 +33,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_HAVE_IFADDRS +#ifdef GRPC_POSIX_SOCKET #include "src/core/lib/iomgr/tcp_server_utils_posix.h" @@ -210,11 +210,12 @@ error: if (fd >= 0) { close(fd); } - grpc_error *ret = grpc_error_set_int( - GRPC_ERROR_CREATE_REFERENCING("Unable to configure socket", &err, 1), - GRPC_ERROR_INT_FD, fd); + grpc_error *ret = + grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Unable to configure socket", &err, 1), + GRPC_ERROR_INT_FD, fd); GRPC_ERROR_UNREF(err); return ret; } -#endif /* GRPC_HAVE_IFADDRS */ +#endif /* GRPC_POSIX_SOCKET */ diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c b/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c index 6354a6bdc1..2078a68126 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c +++ b/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c @@ -94,7 +94,8 @@ static grpc_error *get_unused_port(int *port) { } close(fd); *port = grpc_sockaddr_get_port(&wild); - return *port <= 0 ? GRPC_ERROR_CREATE("Bad port") : GRPC_ERROR_NONE; + return *port <= 0 ? GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad port") + : GRPC_ERROR_NONE; } grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, @@ -114,7 +115,7 @@ grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, if ((err = get_unused_port(&requested_port)) != GRPC_ERROR_NONE) { return err; } else if (requested_port <= 0) { - return GRPC_ERROR_CREATE("Bad get_unused_port()"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad get_unused_port()"); } gpr_log(GPR_DEBUG, "Picked unused port %d", requested_port); } @@ -139,7 +140,7 @@ grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, memcpy(addr.addr, ifa_it->ifa_addr, addr.len); if (!grpc_sockaddr_set_port(&addr, requested_port)) { /* Should never happen, because we check sa_family above. */ - err = GRPC_ERROR_CREATE("Failed to set port"); + err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to set port"); break; } if (grpc_sockaddr_to_string(&addr_str, &addr, 0) < 0) { @@ -163,7 +164,7 @@ grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, if (gpr_asprintf(&err_str, "Failed to add listener: %s", addr_str) < 0) { err_str = gpr_strdup("Failed to add listener"); } - root_err = GRPC_ERROR_CREATE(err_str); + root_err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(err_str); gpr_free(err_str); gpr_free(addr_str); err = grpc_error_add_child(root_err, err); @@ -183,7 +184,7 @@ grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, if (err != GRPC_ERROR_NONE) { return err; } else if (sp == NULL) { - return GRPC_ERROR_CREATE("No local addresses"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No local addresses"); } else { *out_port = sp->port; return GRPC_ERROR_NONE; diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c b/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c index 95c3198be6..d6a1a0693e 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c +++ b/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c @@ -41,7 +41,7 @@ grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, unsigned port_index, int requested_port, int *out_port) { - return GRPC_ERROR_CREATE("no ifaddrs available"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("no ifaddrs available"); } bool grpc_tcp_server_have_ifaddrs(void) { return false; } diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index eed2773f8a..e9246948f5 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -95,8 +95,8 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, } else { grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); - return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA - " must be a pointer to a buffer pool"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool"); } } } @@ -244,17 +244,19 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, // The last argument to uv_tcp_bind is flags status = uv_tcp_bind(handle, (struct sockaddr *)addr->addr, 0); if (status != 0) { - error = GRPC_ERROR_CREATE("Failed to bind to port"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to bind to port"); error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); return error; } status = uv_listen((uv_stream_t *)handle, SOMAXCONN, on_connect); if (status != 0) { - error = GRPC_ERROR_CREATE("Failed to listen to port"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to listen to port"); error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); return error; } @@ -262,9 +264,10 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, status = uv_tcp_getsockname(handle, (struct sockaddr *)&sockname_temp.addr, (int *)&sockname_temp.len); if (status != 0) { - error = GRPC_ERROR_CREATE("getsockname failed"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("getsockname failed"); error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); return error; } @@ -346,15 +349,17 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, if (status == 0) { error = add_socket_to_server(s, handle, addr, port_index, &sp); } else { - error = GRPC_ERROR_CREATE("Failed to initialize UV tcp handle"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to initialize UV tcp handle"); error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); } gpr_free(allocated_addr); if (error != GRPC_ERROR_NONE) { - grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING( + grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to add port to server", &error, 1); GRPC_ERROR_UNREF(error); error = error_out; diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index bd4b9b2df1..12ce7d3fdd 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -122,8 +122,8 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, } else { grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); - return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA - " must be a pointer to a buffer pool"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool"); } } } @@ -248,9 +248,10 @@ failure: GPR_ASSERT(error != GRPC_ERROR_NONE); char *tgtaddr = grpc_sockaddr_to_uri(addr); grpc_error_set_int( - grpc_error_set_str(GRPC_ERROR_CREATE_REFERENCING( + grpc_error_set_str(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to prepare server socket", &error, 1), - GRPC_ERROR_STR_TARGET_ADDRESS, tgtaddr), + GRPC_ERROR_STR_TARGET_ADDRESS, + grpc_slice_from_copied_string(tgtaddr)), GRPC_ERROR_INT_FD, (intptr_t)sock); gpr_free(tgtaddr); GRPC_ERROR_UNREF(error); @@ -533,7 +534,7 @@ done: gpr_free(allocated_addr); if (error != GRPC_ERROR_NONE) { - grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING( + grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to add port to server", &error, 1); GRPC_ERROR_UNREF(error); error = error_out; diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 5541c62068..8e8db9f7b4 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -152,7 +152,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, // TODO(murgatroid99): figure out what the return value here means uv_read_stop(stream); if (nread == UV_EOF) { - error = GRPC_ERROR_CREATE("EOF"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"); } else if (nread > 0) { // Successful read sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread); @@ -173,7 +173,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, } } else { // nread < 0: Error - error = GRPC_ERROR_CREATE("TCP Read failed"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed"); } grpc_closure_sched(&exec_ctx, cb, error); grpc_exec_ctx_finish(&exec_ctx); @@ -193,9 +193,10 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, status = uv_read_start((uv_stream_t *)tcp->handle, alloc_uv_buf, read_callback); if (status != 0) { - error = GRPC_ERROR_CREATE("TCP Read failed at start"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed at start"); error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); grpc_closure_sched(exec_ctx, cb, error); } if (grpc_tcp_trace) { @@ -214,7 +215,7 @@ static void write_callback(uv_write_t *req, int status) { if (status == 0) { error = GRPC_ERROR_NONE; } else { - error = GRPC_ERROR_CREATE("TCP Write failed"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Write failed"); } if (grpc_tcp_trace) { const char *str = grpc_error_string(error); @@ -249,8 +250,8 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } if (tcp->shutting_down) { - grpc_closure_sched(exec_ctx, cb, - GRPC_ERROR_CREATE("TCP socket is shutting down")); + grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "TCP socket is shutting down")); return; } diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 6c413971e3..9134883226 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -175,7 +175,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { if (error == GRPC_ERROR_NONE) { if (info->wsa_error != 0 && !tcp->shutting_down) { char *utf8_message = gpr_format_message(info->wsa_error); - error = GRPC_ERROR_CREATE(utf8_message); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(utf8_message); gpr_free(utf8_message); grpc_slice_unref_internal(exec_ctx, tcp->read_slice); } else { @@ -185,9 +185,9 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { } else { grpc_slice_unref_internal(exec_ctx, tcp->read_slice); error = tcp->shutting_down - ? GRPC_ERROR_CREATE_REFERENCING("TCP stream shutting down", - &tcp->shutdown_error, 1) - : GRPC_ERROR_CREATE("End of TCP stream"); + ? GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "TCP stream shutting down", &tcp->shutdown_error, 1) + : GRPC_ERROR_CREATE_FROM_STATIC_STRING("End of TCP stream"); } } } @@ -208,9 +208,10 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, WSABUF buffer; if (tcp->shutting_down) { - grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE_REFERENCING( - "TCP socket is shutting down", - &tcp->shutdown_error, 1)); + grpc_closure_sched( + exec_ctx, cb, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "TCP socket is shutting down", &tcp->shutdown_error, 1)); return; } @@ -297,9 +298,10 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, size_t len; if (tcp->shutting_down) { - grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE_REFERENCING( - "TCP socket is shutting down", - &tcp->shutdown_error, 1)); + grpc_closure_sched( + exec_ctx, cb, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "TCP socket is shutting down", &tcp->shutdown_error, 1)); return; } diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index d4df96c214..e53c801929 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -109,8 +109,9 @@ void grpc_timer_list_init(gpr_timespec now) { void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { int i; - run_some_expired_timers(exec_ctx, gpr_inf_future(g_clock_type), NULL, - GRPC_ERROR_CREATE("Timer list shutdown")); + run_some_expired_timers( + exec_ctx, gpr_inf_future(g_clock_type), NULL, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown")); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; gpr_mu_destroy(&shard->mu); @@ -182,9 +183,9 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, if (!g_initialized) { timer->pending = false; - grpc_closure_sched( - exec_ctx, timer->closure, - GRPC_ERROR_CREATE("Attempt to create timer before initialization")); + grpc_closure_sched(exec_ctx, timer->closure, + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Attempt to create timer before initialization")); return; } @@ -376,7 +377,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, exec_ctx, now, next, gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 ? GRPC_ERROR_NONE - : GRPC_ERROR_CREATE("Shutting down timer system")); + : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system")); } #endif /* GRPC_TIMER_USE_GENERIC */ diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index f28a14405d..8e8a07578c 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -78,6 +78,10 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, uv_timer->data = timer; timer->uv_timer = uv_timer; uv_timer_start(uv_timer, run_expired_timer, timeout, 0); + /* We assume that gRPC timers are only used alongside other active gRPC + objects, and that there will therefore always be something else keeping + the uv loop alive whenever there is a timer */ + uv_unref((uv_handle_t *)uv_timer); } void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 71e295770a..60579e18ba 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -59,11 +59,13 @@ #include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/socket_factory_posix.h" #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/support/string.h" @@ -89,6 +91,9 @@ struct grpc_udp_listener { struct grpc_udp_server { gpr_mu mu; + /* factory to use for creating and binding sockets, or NULL */ + grpc_socket_factory *socket_factory; + /* active port count: how many ports are actually still listening */ size_t active_ports; /* destroyed port count: how many ports are completely destroyed */ @@ -113,9 +118,24 @@ struct grpc_udp_server { void *user_data; }; -grpc_udp_server *grpc_udp_server_create(void) { +static grpc_socket_factory *get_socket_factory(const grpc_channel_args *args) { + if (args) { + const grpc_arg *arg = grpc_channel_args_find(args, GRPC_ARG_SOCKET_FACTORY); + if (arg) { + GPR_ASSERT(arg->type == GRPC_ARG_POINTER); + return arg->value.pointer.p; + } + } + return NULL; +} + +grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) { grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server)); gpr_mu_init(&s->mu); + s->socket_factory = get_socket_factory(args); + if (s->socket_factory) { + grpc_socket_factory_ref(s->socket_factory); + } s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; @@ -139,6 +159,10 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { gpr_free(sp); } + if (s->socket_factory) { + grpc_socket_factory_unref(s->socket_factory); + } + gpr_free(s); } @@ -162,10 +186,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { /* delete ALL the things */ gpr_mu_lock(&s->mu); - if (!s->shutdown) { - gpr_mu_unlock(&s->mu); - return; - } + GPR_ASSERT(s->shutdown); if (s->head) { grpc_udp_listener *sp; @@ -205,8 +226,8 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, for (sp = s->head; sp; sp = sp->next) { GPR_ASSERT(sp->orphan_cb); sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data); - grpc_fd_shutdown(exec_ctx, sp->emfd, - GRPC_ERROR_CREATE("Server destroyed")); + grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Server destroyed")); } gpr_mu_unlock(&s->mu); } else { @@ -215,8 +236,17 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, } } +static int bind_socket(grpc_socket_factory *socket_factory, int sockfd, + const grpc_resolved_address *addr) { + return (socket_factory != NULL) + ? grpc_socket_factory_bind(socket_factory, sockfd, addr) + : bind(sockfd, (struct sockaddr *)addr->addr, + (socklen_t)addr->len); +} + /* Prepare a recently-created socket for listening. */ -static int prepare_socket(int fd, const grpc_resolved_address *addr) { +static int prepare_socket(grpc_socket_factory *socket_factory, int fd, + const grpc_resolved_address *addr) { grpc_resolved_address sockname_temp; struct sockaddr *addr_ptr = (struct sockaddr *)addr->addr; /* Set send/receive socket buffers to 1 MB */ @@ -246,7 +276,7 @@ static int prepare_socket(int fd, const grpc_resolved_address *addr) { } GPR_ASSERT(addr->len < ~(socklen_t)0); - if (bind(fd, (struct sockaddr *)addr, (socklen_t)addr->len) < 0) { + if (bind_socket(socket_factory, fd, addr) < 0) { char *addr_str; grpc_sockaddr_to_string(&addr_str, addr, 0); gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno)); @@ -288,7 +318,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_mu_lock(&sp->server->mu); if (error != GRPC_ERROR_NONE) { - if (0 == --sp->server->active_ports) { + if (0 == --sp->server->active_ports && sp->server->shutdown) { gpr_mu_unlock(&sp->server->mu); deactivated_all_ports(exec_ctx, sp->server); } else { @@ -311,7 +341,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_mu_lock(&(sp->server->mu)); if (error != GRPC_ERROR_NONE) { - if (0 == --sp->server->active_ports) { + if (0 == --sp->server->active_ports && sp->server->shutdown) { gpr_mu_unlock(&sp->server->mu); deactivated_all_ports(exec_ctx, sp->server); } else { @@ -339,7 +369,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, char *addr_str; char *name; - port = prepare_socket(fd, addr); + port = prepare_socket(s->socket_factory, fd, addr); if (port >= 0) { grpc_sockaddr_to_string(&addr_str, addr, 1); gpr_asprintf(&name, "udp-server-listener:%s", addr_str); @@ -417,8 +447,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, /* Try listening on IPv6 first. */ addr = &wild6; // TODO(rjshade): Test and propagate the returned grpc_error*: - GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, - &dsmode, &fd)); + GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( + s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); allocated_port1 = add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { @@ -433,8 +463,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, } // TODO(rjshade): Test and propagate the returned grpc_error*: - GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, - &dsmode, &fd)); + GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( + s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); if (fd < 0) { gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); } diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 90842a47f0..9df3fe4d1f 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -58,7 +58,7 @@ typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, void *user_data); /* Create a server, initially not bound to any ports */ -grpc_udp_server *grpc_udp_server_create(void); +grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args); /* Start listening to bound ports. user_data is passed to callbacks. */ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server, diff --git a/src/core/lib/iomgr/unix_sockets_posix.c b/src/core/lib/iomgr/unix_sockets_posix.c index 1233cec04e..281865aece 100644 --- a/src/core/lib/iomgr/unix_sockets_posix.c +++ b/src/core/lib/iomgr/unix_sockets_posix.c @@ -60,7 +60,7 @@ grpc_error *grpc_resolve_unix_domain_address(const char *name, gpr_asprintf(&err_msg, "Path name should not have more than %" PRIuPTR " characters.", GPR_ARRAY_SIZE(un->sun_path) - 1); - err = GRPC_ERROR_CREATE(err_msg); + err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(err_msg); gpr_free(err_msg); return err; } diff --git a/src/core/lib/iomgr/unix_sockets_posix_noop.c b/src/core/lib/iomgr/unix_sockets_posix_noop.c index 1daf5152c1..b9602cbf8b 100644 --- a/src/core/lib/iomgr/unix_sockets_posix_noop.c +++ b/src/core/lib/iomgr/unix_sockets_posix_noop.c @@ -47,7 +47,8 @@ void grpc_create_socketpair_if_unix(int sv[2]) { grpc_error *grpc_resolve_unix_domain_address( const char *name, grpc_resolved_addresses **addresses) { *addresses = NULL; - return GRPC_ERROR_CREATE("Unix domain sockets are not supported on Windows"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Unix domain sockets are not supported on Windows"); } int grpc_is_unix_socket(const grpc_resolved_address *addr) { return false; } diff --git a/src/core/lib/profiling/basic_timers.c b/src/core/lib/profiling/basic_timers.c index 1f1987fb8e..bc8e27714c 100644 --- a/src/core/lib/profiling/basic_timers.c +++ b/src/core/lib/profiling/basic_timers.c @@ -218,7 +218,7 @@ void gpr_timers_set_log_filename(const char *filename) { static void init_output() { gpr_thd_options options = gpr_thd_options_default(); gpr_thd_options_set_joinable(&options); - gpr_thd_new(&g_writing_thread, writing_thread, NULL, &options); + GPR_ASSERT(gpr_thd_new(&g_writing_thread, writing_thread, NULL, &options)); atexit(finish_writing); } diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c index dd44621347..97501e6788 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.c +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c @@ -180,7 +180,7 @@ static grpc_error *create_default_creds_from_path( grpc_slice creds_data = grpc_empty_slice(); grpc_error *error = GRPC_ERROR_NONE; if (creds_path == NULL) { - error = GRPC_ERROR_CREATE("creds_path unset"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("creds_path unset"); goto end; } error = grpc_load_file(creds_path, 0, &creds_data); @@ -190,10 +190,9 @@ static grpc_error *create_default_creds_from_path( json = grpc_json_parse_string_with_len( (char *)GRPC_SLICE_START_PTR(creds_data), GRPC_SLICE_LENGTH(creds_data)); if (json == NULL) { - char *dump = grpc_dump_slice(creds_data, GPR_DUMP_HEX | GPR_DUMP_ASCII); - error = grpc_error_set_str(GRPC_ERROR_CREATE("Failed to parse JSON"), - GRPC_ERROR_STR_RAW_BYTES, dump); - gpr_free(dump); + error = grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to parse JSON"), + GRPC_ERROR_STR_RAW_BYTES, grpc_slice_ref_internal(creds_data)); goto end; } @@ -204,7 +203,7 @@ static grpc_error *create_default_creds_from_path( grpc_service_account_jwt_access_credentials_create_from_auth_json_key( exec_ctx, key, grpc_max_auth_token_lifetime()); if (result == NULL) { - error = GRPC_ERROR_CREATE( + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "grpc_service_account_jwt_access_credentials_create_from_auth_json_" "key failed"); } @@ -217,7 +216,7 @@ static grpc_error *create_default_creds_from_path( result = grpc_refresh_token_credentials_create_from_auth_refresh_token(token); if (result == NULL) { - error = GRPC_ERROR_CREATE( + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "grpc_refresh_token_credentials_create_from_auth_refresh_token " "failed"); } @@ -236,7 +235,8 @@ end: grpc_channel_credentials *grpc_google_default_credentials_create(void) { grpc_channel_credentials *result = NULL; grpc_call_credentials *call_creds = NULL; - grpc_error *error = GRPC_ERROR_CREATE("Failed to create Google credentials"); + grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to create Google credentials"); grpc_error *err; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -274,7 +274,8 @@ grpc_channel_credentials *grpc_google_default_credentials_create(void) { call_creds = grpc_google_compute_engine_credentials_create(NULL); if (call_creds == NULL) { error = grpc_error_add_child( - error, GRPC_ERROR_CREATE("Failed to get credentials from network")); + error, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to get credentials from network")); } } } diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index 68ebc386fa..b69f38758c 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -95,7 +95,8 @@ static void reset_auth_metadata_context( static void add_error(grpc_error **combined, grpc_error *error) { if (error == GRPC_ERROR_NONE) return; if (*combined == GRPC_ERROR_NONE) { - *combined = GRPC_ERROR_CREATE("Client auth metadata plugin error"); + *combined = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Client auth metadata plugin error"); } *combined = grpc_error_add_child(*combined, error); } @@ -114,9 +115,10 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data, grpc_error *error = GRPC_ERROR_NONE; if (status != GRPC_CREDENTIALS_OK) { error = grpc_error_set_int( - GRPC_ERROR_CREATE(error_details != NULL && strlen(error_details) > 0 - ? error_details - : "Credentials failed to get metadata."), + GRPC_ERROR_CREATE_FROM_COPIED_STRING( + error_details != NULL && strlen(error_details) > 0 + ? error_details + : "Credentials failed to get metadata."), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED); } else { GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT); @@ -194,7 +196,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_finish_with_failure( exec_ctx, op, grpc_error_set_int( - GRPC_ERROR_CREATE( + GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Incompatible credentials set on channel and call."), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED)); return; @@ -227,9 +229,10 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data, host); gpr_free(host); grpc_call_element_signal_error( - exec_ctx, elem, grpc_error_set_int(GRPC_ERROR_CREATE(error_msg), - GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAUTHENTICATED)); + exec_ctx, elem, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAUTHENTICATED)); gpr_free(error_msg); } } diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 7d58843d69..568d70fa38 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -162,7 +162,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, if (error != GRPC_ERROR_NONE) { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, ep->read_buffer); - call_read_cb(exec_ctx, ep, GRPC_ERROR_CREATE_REFERENCING( + call_read_cb(exec_ctx, ep, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Secure read failed", &error, 1)); return; } @@ -220,8 +220,10 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, if (result != TSI_OK) { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, ep->read_buffer); - call_read_cb(exec_ctx, ep, grpc_set_tsi_error_result( - GRPC_ERROR_CREATE("Unwrap failed"), result)); + call_read_cb( + exec_ctx, ep, + grpc_set_tsi_error_result( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unwrap failed"), result)); return; } @@ -332,7 +334,8 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &ep->output_buffer); grpc_closure_sched( exec_ctx, cb, - grpc_set_tsi_error_result(GRPC_ERROR_CREATE("Wrap failed"), result)); + grpc_set_tsi_error_result( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result)); GPR_TIMER_END("secure_endpoint.endpoint_write", 0); return; } diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c index ad083a730f..b0cbc83639 100644 --- a/src/core/lib/security/transport/security_connector.c +++ b/src/core/lib/security/transport/security_connector.c @@ -137,9 +137,9 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx, grpc_auth_context **auth_context, grpc_closure *on_peer_checked) { if (sc == NULL) { - grpc_closure_sched( - exec_ctx, on_peer_checked, - GRPC_ERROR_CREATE("cannot check peer -- no security connector")); + grpc_closure_sched(exec_ctx, on_peer_checked, + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "cannot check peer -- no security connector")); tsi_peer_destruct(&peer); } else { sc->vtable->check_peer(exec_ctx, sc, peer, auth_context, on_peer_checked); @@ -330,7 +330,8 @@ static void fake_check_peer(grpc_exec_ctx *exec_ctx, grpc_error *error = GRPC_ERROR_NONE; *auth_context = NULL; if (peer.property_count != 1) { - error = GRPC_ERROR_CREATE("Fake peers should only have 1 property."); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Fake peers should only have 1 property."); goto end; } prop_name = peer.properties[0].name; @@ -339,13 +340,14 @@ static void fake_check_peer(grpc_exec_ctx *exec_ctx, char *msg; gpr_asprintf(&msg, "Unexpected property in fake peer: %s.", prop_name == NULL ? "<EMPTY>" : prop_name); - error = GRPC_ERROR_CREATE(msg); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); goto end; } if (strncmp(peer.properties[0].value.data, TSI_FAKE_CERTIFICATE_TYPE, peer.properties[0].value.length)) { - error = GRPC_ERROR_CREATE("Invalid value for cert type property."); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Invalid value for cert type property."); goto end; } *auth_context = grpc_auth_context_create(NULL); @@ -586,18 +588,19 @@ static grpc_error *ssl_check_peer(grpc_security_connector *sc, const tsi_peer_property *p = tsi_peer_get_property_by_name(peer, TSI_SSL_ALPN_SELECTED_PROTOCOL); if (p == NULL) { - return GRPC_ERROR_CREATE( + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Cannot check peer: missing selected ALPN property."); } if (!grpc_chttp2_is_alpn_version_supported(p->value.data, p->value.length)) { - return GRPC_ERROR_CREATE("Cannot check peer: invalid ALPN value."); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Cannot check peer: invalid ALPN value."); } /* Check the peer name if specified. */ if (peer_name != NULL && !ssl_host_matches_name(peer, peer_name)) { char *msg; gpr_asprintf(&msg, "Peer name %s is not in peer certificate", peer_name); - grpc_error *error = GRPC_ERROR_CREATE(msg); + grpc_error *error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); return error; } diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index 7065d261ba..2f39327670 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -120,7 +120,7 @@ static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx, if (error == GRPC_ERROR_NONE) { // If we were shut down after the handshake succeeded but before an // endpoint callback was invoked, we need to generate our own error. - error = GRPC_ERROR_CREATE("Handshaker shutdown"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshaker shutdown"); } const char *msg = grpc_error_string(error); gpr_log(GPR_DEBUG, "Security handshake failed: %s", msg); @@ -156,7 +156,8 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, tsi_handshaker_create_frame_protector(h->handshaker, NULL, &protector); if (result != TSI_OK) { error = grpc_set_tsi_error_result( - GRPC_ERROR_CREATE("Frame protector creation failed"), result); + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Frame protector creation failed"), + result); security_handshake_failed_locked(exec_ctx, h, error); goto done; } @@ -191,7 +192,7 @@ static grpc_error *check_peer_locked(grpc_exec_ctx *exec_ctx, tsi_result result = tsi_handshaker_extract_peer(h->handshaker, &peer); if (result != TSI_OK) { return grpc_set_tsi_error_result( - GRPC_ERROR_CREATE("Peer extraction failed"), result); + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Peer extraction failed"), result); } grpc_security_connector_check_peer(exec_ctx, h->connector, peer, &h->auth_context, &h->on_peer_checked); @@ -215,8 +216,8 @@ static grpc_error *send_handshake_bytes_to_peer_locked(grpc_exec_ctx *exec_ctx, } } while (result == TSI_INCOMPLETE_DATA); if (result != TSI_OK) { - return grpc_set_tsi_error_result(GRPC_ERROR_CREATE("Handshake failed"), - result); + return grpc_set_tsi_error_result( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake failed"), result); } // Send data. grpc_slice to_send = @@ -234,8 +235,8 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&h->mu); if (error != GRPC_ERROR_NONE || h->shutdown) { security_handshake_failed_locked( - exec_ctx, h, - GRPC_ERROR_CREATE_REFERENCING("Handshake read failed", &error, 1)); + exec_ctx, h, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Handshake read failed", &error, 1)); gpr_mu_unlock(&h->mu); security_handshaker_unref(exec_ctx, h); return; @@ -270,8 +271,9 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, } if (result != TSI_OK) { security_handshake_failed_locked( - exec_ctx, h, grpc_set_tsi_error_result( - GRPC_ERROR_CREATE("Handshake failed"), result)); + exec_ctx, h, + grpc_set_tsi_error_result( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake failed"), result)); gpr_mu_unlock(&h->mu); security_handshaker_unref(exec_ctx, h); return; @@ -314,8 +316,8 @@ static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&h->mu); if (error != GRPC_ERROR_NONE || h->shutdown) { security_handshake_failed_locked( - exec_ctx, h, - GRPC_ERROR_CREATE_REFERENCING("Handshake write failed", &error, 1)); + exec_ctx, h, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Handshake write failed", &error, 1)); gpr_mu_unlock(&h->mu); security_handshaker_unref(exec_ctx, h); return; @@ -429,7 +431,8 @@ static void fail_handshaker_do_handshake(grpc_exec_ctx *exec_ctx, grpc_closure *on_handshake_done, grpc_handshaker_args *args) { grpc_closure_sched(exec_ctx, on_handshake_done, - GRPC_ERROR_CREATE("Failed to create security handshaker")); + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to create security handshaker")); } static const grpc_handshaker_vtable fail_handshaker_vtable = { diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index 9579d27bc4..b103b7400c 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -145,9 +145,10 @@ static void on_md_processing_done( calld->transport_op->send_message = false; } calld->transport_op->send_trailing_metadata = NULL; - grpc_closure_sched(&exec_ctx, calld->on_done_recv, - grpc_error_set_int(GRPC_ERROR_CREATE(error_details), - GRPC_ERROR_INT_GRPC_STATUS, status)); + grpc_closure_sched( + &exec_ctx, calld->on_done_recv, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details), + GRPC_ERROR_INT_GRPC_STATUS, status)); } grpc_exec_ctx_finish(&exec_ctx); @@ -159,7 +160,7 @@ static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; if (error == GRPC_ERROR_NONE) { - if (chand->creds->processor.process != NULL) { + if (chand->creds != NULL && chand->creds->processor.process != NULL) { calld->md = metadata_batch_to_md_array(calld->recv_initial_metadata); chand->creds->processor.process( chand->creds->processor.state, calld->auth_context, @@ -246,7 +247,6 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(!args->is_last); GPR_ASSERT(auth_context != NULL); - GPR_ASSERT(creds != NULL); /* initialize members */ chand->auth_context = diff --git a/src/core/lib/security/transport/tsi_error.c b/src/core/lib/security/transport/tsi_error.c index afc1733567..eae0a676b0 100644 --- a/src/core/lib/security/transport/tsi_error.c +++ b/src/core/lib/security/transport/tsi_error.c @@ -34,7 +34,9 @@ #include "src/core/lib/security/transport/tsi_error.h" grpc_error *grpc_set_tsi_error_result(grpc_error *error, tsi_result result) { - return grpc_error_set_int(grpc_error_set_str(error, GRPC_ERROR_STR_TSI_ERROR, - tsi_result_to_string(result)), - GRPC_ERROR_INT_TSI_CODE, result); + return grpc_error_set_int( + grpc_error_set_str( + error, GRPC_ERROR_STR_TSI_ERROR, + grpc_slice_from_static_string(tsi_result_to_string(result))), + GRPC_ERROR_INT_TSI_CODE, result); } diff --git a/src/core/lib/security/util/b64.c b/src/core/lib/security/util/b64.c index 09c8213131..0d5a917660 100644 --- a/src/core/lib/security/util/b64.c +++ b/src/core/lib/security/util/b64.c @@ -71,15 +71,31 @@ static const char base64_url_safe_chars[] = char *grpc_base64_encode(const void *vdata, size_t data_size, int url_safe, int multiline) { - const unsigned char *data = vdata; - const char *base64_chars = - url_safe ? base64_url_safe_chars : base64_url_unsafe_chars; + size_t result_projected_size = + grpc_base64_estimate_encoded_size(data_size, url_safe, multiline); + char *result = gpr_malloc(result_projected_size); + grpc_base64_encode_core(result, vdata, data_size, url_safe, multiline); + return result; +} + +size_t grpc_base64_estimate_encoded_size(size_t data_size, int url_safe, + int multiline) { size_t result_projected_size = 4 * ((data_size + 3) / 3) + 2 * (multiline ? (data_size / (3 * GRPC_BASE64_MULTILINE_NUM_BLOCKS)) : 0) + 1; - char *result = gpr_malloc(result_projected_size); + return result_projected_size; +} + +void grpc_base64_encode_core(char *result, const void *vdata, size_t data_size, + int url_safe, int multiline) { + const unsigned char *data = vdata; + const char *base64_chars = + url_safe ? base64_url_safe_chars : base64_url_unsafe_chars; + const size_t result_projected_size = + grpc_base64_estimate_encoded_size(data_size, url_safe, multiline); + char *current = result; size_t num_blocks = 0; size_t i = 0; @@ -119,7 +135,6 @@ char *grpc_base64_encode(const void *vdata, size_t data_size, int url_safe, GPR_ASSERT(current >= result); GPR_ASSERT((uintptr_t)(current - result) < result_projected_size); result[current - result] = '\0'; - return result; } grpc_slice grpc_base64_decode(grpc_exec_ctx *exec_ctx, const char *b64, diff --git a/src/core/lib/security/util/b64.h b/src/core/lib/security/util/b64.h index d42a136f61..ef52291c6a 100644 --- a/src/core/lib/security/util/b64.h +++ b/src/core/lib/security/util/b64.h @@ -37,10 +37,22 @@ #include <grpc/slice.h> /* Encodes data using base64. It is the caller's responsability to free - the returned char * using gpr_free. Returns NULL on NULL input. */ + the returned char * using gpr_free. Returns NULL on NULL input. + TODO(makdharma) : change the flags to bool from int */ char *grpc_base64_encode(const void *data, size_t data_size, int url_safe, int multiline); +/* estimate the upper bound on size of base64 encoded data. The actual size + * is guaranteed to be less than or equal to the size returned here. */ +size_t grpc_base64_estimate_encoded_size(size_t data_size, int url_safe, + int multiline); + +/* Encodes data using base64 and write it to memory pointed to by result. It is + * the caller's responsiblity to allocate enough memory in |result| to fit the + * encoded data. */ +void grpc_base64_encode_core(char *result, const void *vdata, size_t data_size, + int url_safe, int multiline); + /* Decodes data according to the base64 specification. Returns an empty slice in case of failure. */ grpc_slice grpc_base64_decode(grpc_exec_ctx *exec_ctx, const char *b64, diff --git a/src/core/lib/support/atm.c b/src/core/lib/support/atm.c new file mode 100644 index 0000000000..06e8432caf --- /dev/null +++ b/src/core/lib/support/atm.c @@ -0,0 +1,47 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/atm.h> +#include <grpc/support/useful.h> + +gpr_atm gpr_atm_no_barrier_clamped_add(gpr_atm *value, gpr_atm delta, + gpr_atm min, gpr_atm max) { + gpr_atm current; + gpr_atm new; + do { + current = gpr_atm_no_barrier_load(value); + new = GPR_CLAMP(current + delta, min, max); + if (new == current) break; + } while (!gpr_atm_no_barrier_cas(value, current, new)); + return new; +} diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 85df39a393..cabbd1b43d 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -260,7 +260,7 @@ static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, static void add_init_error(grpc_error **composite, grpc_error *new) { if (new == GRPC_ERROR_NONE) return; if (*composite == GRPC_ERROR_NONE) - *composite = GRPC_ERROR_CREATE("Call creation failed"); + *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed"); *composite = grpc_error_add_child(*composite, new); } @@ -332,17 +332,17 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, * call. */ if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) { if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) { - add_init_error(&error, - GRPC_ERROR_CREATE("Census tracing propagation requested " - "without Census context propagation")); + add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Census tracing propagation requested " + "without Census context propagation")); } grpc_call_context_set( call, GRPC_CONTEXT_TRACING, args->parent_call->context[GRPC_CONTEXT_TRACING].value, NULL); } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) { - add_init_error(&error, - GRPC_ERROR_CREATE("Census context propagation requested " - "without Census tracing propagation")); + add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Census context propagation requested " + "without Census tracing propagation")); } if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) { call->cancellation_is_inherited = 1; @@ -600,8 +600,9 @@ static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, static grpc_error *error_from_status(grpc_status_code status, const char *description) { return grpc_error_set_int( - grpc_error_set_str(GRPC_ERROR_CREATE(description), - GRPC_ERROR_STR_GRPC_MESSAGE, description), + grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description), + GRPC_ERROR_STR_GRPC_MESSAGE, + grpc_slice_from_copied_string(description)), GRPC_ERROR_INT_GRPC_STATUS, status); } @@ -621,16 +622,15 @@ static bool get_final_status_from( void (*set_value)(grpc_status_code code, void *user_data), void *set_value_user_data, grpc_slice *details) { grpc_status_code code; - const char *msg = NULL; - grpc_error_get_status(error, call->send_deadline, &code, &msg, NULL); + grpc_slice slice; + grpc_error_get_status(error, call->send_deadline, &code, &slice, NULL); if (code == GRPC_STATUS_OK && !allow_ok_status) { return false; } set_value(code, set_value_user_data); if (details != NULL) { - *details = - msg == NULL ? grpc_empty_slice() : grpc_slice_from_copied_string(msg); + *details = grpc_slice_ref_internal(slice); } return true; } @@ -893,18 +893,19 @@ static void recv_common_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_error *error = status_code == GRPC_STATUS_OK ? GRPC_ERROR_NONE - : grpc_error_set_int(GRPC_ERROR_CREATE("Error received from peer"), + : grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Error received from peer"), GRPC_ERROR_INT_GRPC_STATUS, (intptr_t)status_code); if (b->idx.named.grpc_message != NULL) { - char *msg = - grpc_slice_to_c_string(GRPC_MDVALUE(b->idx.named.grpc_message->md)); - error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, msg); - gpr_free(msg); + error = grpc_error_set_str( + error, GRPC_ERROR_STR_GRPC_MESSAGE, + grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md))); grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_message); } else if (error != GRPC_ERROR_NONE) { - error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, ""); + error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, + grpc_empty_slice()); } set_status_from_error(exec_ctx, call, STATUS_FROM_WIRE, error); @@ -1050,8 +1051,8 @@ static grpc_error *consolidate_batch_errors(batch_control *bctl) { bctl->errors[0] = NULL; return e; } else { - grpc_error *error = - GRPC_ERROR_CREATE_REFERENCING("Call batch failed", bctl->errors, n); + grpc_error *error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Call batch failed", bctl->errors, n); for (size_t i = 0; i < n; i++) { GRPC_ERROR_UNREF(bctl->errors[i]); bctl->errors[i] = NULL; @@ -1520,7 +1521,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, { grpc_error *override_error = GRPC_ERROR_NONE; if (op->data.send_status_from_server.status != GRPC_STATUS_OK) { - override_error = GRPC_ERROR_CREATE("Error from server send status"); + override_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Error from server send status"); } if (op->data.send_status_from_server.status_details != NULL) { call->send_extra_metadata[1].md = grpc_mdelem_from_slices( @@ -1530,8 +1532,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->send_extra_metadata_count++; char *msg = grpc_slice_to_c_string( GRPC_MDVALUE(call->send_extra_metadata[1].md)); - override_error = grpc_error_set_str( - override_error, GRPC_ERROR_STR_GRPC_MESSAGE, msg); + override_error = + grpc_error_set_str(override_error, GRPC_ERROR_STR_GRPC_MESSAGE, + grpc_slice_from_copied_string(msg)); gpr_free(msg); } set_status_from_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 2b700b2f67..b4bfb92042 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -85,19 +85,10 @@ struct grpc_channel { static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, - const grpc_channel_args *input_args, - grpc_channel_stack_type channel_stack_type, - grpc_transport *optional_transport) { - grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create(); - grpc_channel_stack_builder_set_channel_arguments(exec_ctx, builder, - input_args); - grpc_channel_stack_builder_set_target(builder, target); - grpc_channel_stack_builder_set_transport(builder, optional_transport); - if (!grpc_channel_init_create_stack(exec_ctx, builder, channel_stack_type)) { - grpc_channel_stack_builder_destroy(exec_ctx, builder); - return NULL; - } +grpc_channel *grpc_channel_create_with_builder( + grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, + grpc_channel_stack_type channel_stack_type) { + char *target = gpr_strdup(grpc_channel_stack_builder_get_target(builder)); grpc_channel_args *args = grpc_channel_args_copy( grpc_channel_stack_builder_get_channel_arguments(builder)); grpc_channel *channel; @@ -108,11 +99,12 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, gpr_log(GPR_ERROR, "channel stack builder failed: %s", grpc_error_string(error)); GRPC_ERROR_UNREF(error); + gpr_free(target); goto done; } memset(channel, 0, sizeof(*channel)); - channel->target = gpr_strdup(target); + channel->target = target; channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type); gpr_mu_init(&channel->registered_call_mu); channel->registered_calls = NULL; @@ -183,10 +175,33 @@ done: return channel; } +grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, + const grpc_channel_args *input_args, + grpc_channel_stack_type channel_stack_type, + grpc_transport *optional_transport) { + grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create(); + grpc_channel_stack_builder_set_channel_arguments(exec_ctx, builder, + input_args); + grpc_channel_stack_builder_set_target(builder, target); + grpc_channel_stack_builder_set_transport(builder, optional_transport); + if (!grpc_channel_init_create_stack(exec_ctx, builder, channel_stack_type)) { + grpc_channel_stack_builder_destroy(exec_ctx, builder); + return NULL; + } + return grpc_channel_create_with_builder(exec_ctx, builder, + channel_stack_type); +} + size_t grpc_channel_get_call_size_estimate(grpc_channel *channel) { #define ROUND_UP_SIZE 256 + /* We round up our current estimate to the NEXT value of ROUND_UP_SIZE. + This ensures: + 1. a consistent size allocation when our estimate is drifting slowly + (which is common) - which tends to help most allocators reuse memory + 2. a small amount of allowed growth over the estimate without hitting + the arena size doubling case, reducing overall memory usage */ return ((size_t)gpr_atm_no_barrier_load(&channel->call_size_estimate) + - ROUND_UP_SIZE) & + 2 * ROUND_UP_SIZE) & ~(size_t)(ROUND_UP_SIZE - 1); } @@ -380,7 +395,8 @@ void grpc_channel_destroy(grpc_channel *channel) { grpc_channel_element *elem; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel)); - op->disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed"); + op->disconnect_with_error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Destroyed"); elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); elem->filter->start_transport_op(&exec_ctx, elem, op); diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index c4aebd8b9b..0f203a3e59 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -35,6 +35,7 @@ #define GRPC_CORE_LIB_SURFACE_CHANNEL_H #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/surface/channel_stack_type.h" grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, @@ -42,6 +43,10 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, grpc_channel_stack_type channel_stack_type, grpc_transport *optional_transport); +grpc_channel *grpc_channel_create_with_builder( + grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, + grpc_channel_stack_type channel_stack_type); + /** Create a call given a grpc_channel, in order to call \a method. Progress is tied to activity on \a pollset_set. The returned call object is meant to be used with \a grpc_call_start_batch_and_execute, which relies on diff --git a/src/core/lib/surface/completion_queue_factory.c b/src/core/lib/surface/completion_queue_factory.c new file mode 100644 index 0000000000..db67a5192b --- /dev/null +++ b/src/core/lib/surface/completion_queue_factory.c @@ -0,0 +1,77 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/surface/completion_queue_factory.h" +#include "src/core/lib/surface/completion_queue.h" + +#include <grpc/support/log.h> + +/* TODO (sreek) - Currently this does not use the attributes arg. This will be + added in a future PR */ +static grpc_completion_queue* default_create( + const grpc_completion_queue_factory* factory, + const grpc_completion_queue_attributes* attributes) { + return grpc_completion_queue_create(NULL); +} + +static grpc_completion_queue_factory_vtable default_vtable = {default_create}; + +static const grpc_completion_queue_factory g_default_cq_factory = { + "Default Factory", NULL, &default_vtable}; + +const grpc_completion_queue_factory* grpc_completion_queue_factory_lookup( + const grpc_completion_queue_attributes* attributes) { + /* As we add more fields to grpc_completion_queue_attributes, we may have to + change this assert to: + GPR_ASSERT (attributes->version >= 1 && + attributes->version <= GRPC_CQ_CURRENT_VERSION) */ + GPR_ASSERT(attributes->version == 1); + + /* The default factory can handle version 1 of the attributes structure. We + may have to change this as more fields are added to the structure */ + return &g_default_cq_factory; +} + +grpc_completion_queue* grpc_completion_queue_create_for_next(void* reserved) { + GPR_ASSERT(!reserved); + grpc_completion_queue_attributes attr = {1, GRPC_CQ_NEXT, + GRPC_CQ_DEFAULT_POLLING}; + return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr); +} + +grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) { + GPR_ASSERT(!reserved); + grpc_completion_queue_attributes attr = {1, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}; + return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr); +} diff --git a/src/core/lib/surface/completion_queue_factory.h b/src/core/lib/surface/completion_queue_factory.h new file mode 100644 index 0000000000..57e90b5090 --- /dev/null +++ b/src/core/lib/surface/completion_queue_factory.h @@ -0,0 +1,51 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_FACTORY_H +#define GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_FACTORY_H + +#include <grpc/grpc.h> +#include "src/core/lib/surface/completion_queue.h" + +typedef struct grpc_completion_queue_factory_vtable { + grpc_completion_queue* (*create)(const grpc_completion_queue_factory*, + const grpc_completion_queue_attributes*); +} grpc_completion_queue_factory_vtable; + +struct grpc_completion_queue_factory { + const char* name; + void* data; /* Factory specific data */ + grpc_completion_queue_factory_vtable* vtable; +}; + +#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_FACTORY_H */ diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 4fe33a7dc9..18b4f3691b 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -92,7 +92,8 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, op->payload->recv_trailing_metadata.recv_trailing_metadata); } grpc_transport_stream_op_finish_with_failure( - exec_ctx, op, GRPC_ERROR_CREATE("lame client channel")); + exec_ctx, op, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); } static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { @@ -113,8 +114,9 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE); } if (op->send_ping != NULL) { - grpc_closure_sched(exec_ctx, op->send_ping, - GRPC_ERROR_CREATE("lame client channel")); + grpc_closure_sched( + exec_ctx, op->send_ping, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); } GRPC_ERROR_UNREF(op->disconnect_with_error); if (op->on_consumed != NULL) { diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 0ce82dfd11..0c8a382f38 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -287,10 +287,10 @@ static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_channel_element *elem; op->goaway_error = - send_goaway - ? grpc_error_set_int(GRPC_ERROR_CREATE("Server shutdown"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK) - : GRPC_ERROR_NONE; + send_goaway ? grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK) + : GRPC_ERROR_NONE; op->set_accept_stream = true; sc->slice = grpc_slice_from_copied_string("Server shutdown"); op->disconnect_with_error = send_disconnect; @@ -707,8 +707,9 @@ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, return; } - kill_pending_work_locked(exec_ctx, server, - GRPC_ERROR_CREATE("Server Shutdown")); + kill_pending_work_locked( + exec_ctx, server, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); if (server->root_channel_data.next != &server->root_channel_data || server->listeners_destroyed < num_listeners(server)) { @@ -766,8 +767,8 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, /* do nothing */ } else { grpc_error *src_error = error; - error = - GRPC_ERROR_CREATE_REFERENCING("Missing :authority or :path", &error, 1); + error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Missing :authority or :path", &error, 1); GRPC_ERROR_UNREF(src_error); } @@ -1217,7 +1218,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, op->on_connectivity_state_change = &chand->channel_connectivity_changed; op->connectivity_state = &chand->connectivity_state; if (gpr_atm_acq_load(&s->shutdown_flag) != 0) { - op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown"); + op->disconnect_with_error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"); } grpc_transport_perform_op(exec_ctx, transport, op); } @@ -1275,8 +1277,9 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); - kill_pending_work_locked(&exec_ctx, server, - GRPC_ERROR_CREATE("Server Shutdown")); + kill_pending_work_locked( + &exec_ctx, server, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); gpr_mu_unlock(&server->mu_call); maybe_finish_shutdown(&exec_ctx, server); @@ -1306,8 +1309,9 @@ void grpc_server_cancel_all_calls(grpc_server *server) { channel_broadcaster_init(server, &broadcaster); gpr_mu_unlock(&server->mu_global); - channel_broadcaster_shutdown(&exec_ctx, &broadcaster, false /* send_goaway */, - GRPC_ERROR_CREATE("Cancelling all calls")); + channel_broadcaster_shutdown( + &exec_ctx, &broadcaster, false /* send_goaway */, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls")); grpc_exec_ctx_finish(&exec_ctx); } @@ -1355,16 +1359,16 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { fail_call(exec_ctx, server, cq_idx, rc, - GRPC_ERROR_CREATE("Server Shutdown")); + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); return GRPC_CALL_OK; } request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]); if (request_id == -1) { /* out of request ids: just fail this one */ fail_call(exec_ctx, server, cq_idx, rc, - grpc_error_set_int(GRPC_ERROR_CREATE("Out of request ids"), - GRPC_ERROR_INT_LIMIT, - server->max_requested_calls_per_cq)); + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"), + GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq)); return GRPC_CALL_OK; } switch (rc->type) { diff --git a/src/core/lib/surface/validate_metadata.c b/src/core/lib/surface/validate_metadata.c index 7ec9137265..6e76c4efe7 100644 --- a/src/core/lib/surface/validate_metadata.c +++ b/src/core/lib/surface/validate_metadata.c @@ -39,6 +39,7 @@ #include <grpc/support/port_platform.h> #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" static grpc_error *conforms_to(grpc_slice slice, const uint8_t *legal_bits, @@ -52,9 +53,10 @@ static grpc_error *conforms_to(grpc_slice slice, const uint8_t *legal_bits, if ((legal_bits[byte] & (1 << bit)) == 0) { char *dump = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); grpc_error *error = grpc_error_set_str( - grpc_error_set_int(GRPC_ERROR_CREATE(err_desc), GRPC_ERROR_INT_OFFSET, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(err_desc), + GRPC_ERROR_INT_OFFSET, p - GRPC_SLICE_START_PTR(slice)), - GRPC_ERROR_STR_RAW_BYTES, dump); + GRPC_ERROR_STR_RAW_BYTES, grpc_slice_from_copied_string(dump)); gpr_free(dump); return error; } @@ -74,10 +76,12 @@ grpc_error *grpc_validate_header_key_is_legal(grpc_slice slice) { 0x80, 0xfe, 0xff, 0xff, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; if (GRPC_SLICE_LENGTH(slice) == 0) { - return GRPC_ERROR_CREATE("Metadata keys cannot be zero length"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Metadata keys cannot be zero length"); } if (GRPC_SLICE_START_PTR(slice)[0] == ':') { - return GRPC_ERROR_CREATE("Metadata keys cannot start with :"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Metadata keys cannot start with :"); } return conforms_to(slice, legal_header_bits, "Illegal header key"); } diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index afe1f6164d..3757b25267 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -79,7 +79,8 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx, *w->current = GRPC_CHANNEL_SHUTDOWN; error = GRPC_ERROR_NONE; } else { - error = GRPC_ERROR_CREATE("Shutdown connectivity owner"); + error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutdown connectivity owner"); } grpc_closure_sched(exec_ctx, w->notify, error); gpr_free(w); diff --git a/src/core/lib/transport/error_utils.c b/src/core/lib/transport/error_utils.c index ef55e561fb..4e70f8749d 100644 --- a/src/core/lib/transport/error_utils.c +++ b/src/core/lib/transport/error_utils.c @@ -55,7 +55,7 @@ static grpc_error *recursively_find_error_with_field(grpc_error *error, } void grpc_error_get_status(grpc_error *error, gpr_timespec deadline, - grpc_status_code *code, const char **msg, + grpc_status_code *code, grpc_slice *slice, grpc_http2_error_code *http_error) { // Start with the parent error and recurse through the tree of children // until we find the first one that has a status code. @@ -97,11 +97,11 @@ void grpc_error_get_status(grpc_error *error, gpr_timespec deadline, // If the error has a status message, use it. Otherwise, fall back to // the error description. - if (msg != NULL) { - *msg = grpc_error_get_str(found_error, GRPC_ERROR_STR_GRPC_MESSAGE); - if (*msg == NULL && error != GRPC_ERROR_NONE) { - *msg = grpc_error_get_str(found_error, GRPC_ERROR_STR_DESCRIPTION); - if (*msg == NULL) *msg = "unknown error"; // Just in case. + if (slice != NULL) { + if (!grpc_error_get_str(found_error, GRPC_ERROR_STR_GRPC_MESSAGE, slice)) { + if (!grpc_error_get_str(found_error, GRPC_ERROR_STR_DESCRIPTION, slice)) { + *slice = grpc_slice_from_static_string("unknown error"); + } } } diff --git a/src/core/lib/transport/error_utils.h b/src/core/lib/transport/error_utils.h index 105338880a..3b44466ab8 100644 --- a/src/core/lib/transport/error_utils.h +++ b/src/core/lib/transport/error_utils.h @@ -44,7 +44,7 @@ /// attributes (code, msg, http_status) are unneeded, they can be passed as /// NULL. void grpc_error_get_status(grpc_error *error, gpr_timespec deadline, - grpc_status_code *code, const char **msg, + grpc_status_code *code, grpc_slice *slice, grpc_http2_error_code *http_status); /// A utility function to check whether there is a clear status code that diff --git a/src/core/lib/transport/metadata_batch.c b/src/core/lib/transport/metadata_batch.c index fc2c52bd8a..fa73244aa4 100644 --- a/src/core/lib/transport/metadata_batch.c +++ b/src/core/lib/transport/metadata_batch.c @@ -101,12 +101,10 @@ void grpc_metadata_batch_destroy(grpc_exec_ctx *exec_ctx, } grpc_error *grpc_attach_md_to_error(grpc_error *src, grpc_mdelem md) { - char *k = grpc_slice_to_c_string(GRPC_MDKEY(md)); - char *v = grpc_slice_to_c_string(GRPC_MDVALUE(md)); grpc_error *out = grpc_error_set_str( - grpc_error_set_str(src, GRPC_ERROR_STR_KEY, k), GRPC_ERROR_STR_VALUE, v); - gpr_free(k); - gpr_free(v); + grpc_error_set_str(src, GRPC_ERROR_STR_KEY, + grpc_slice_ref_internal(GRPC_MDKEY(md))), + GRPC_ERROR_STR_VALUE, grpc_slice_ref_internal(GRPC_MDVALUE(md))); return out; } @@ -126,7 +124,8 @@ static grpc_error *maybe_link_callout(grpc_metadata_batch *batch, return GRPC_ERROR_NONE; } return grpc_attach_md_to_error( - GRPC_ERROR_CREATE("Unallowed duplicate metadata"), storage->md); + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unallowed duplicate metadata"), + storage->md); } static void maybe_unlink_callout(grpc_metadata_batch *batch, @@ -302,7 +301,7 @@ static void add_error(grpc_error **composite, grpc_error *error, const char *composite_error_string) { if (error == GRPC_ERROR_NONE) return; if (*composite == GRPC_ERROR_NONE) { - *composite = GRPC_ERROR_CREATE(composite_error_string); + *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(composite_error_string); } *composite = grpc_error_add_child(*composite, error); } diff --git a/src/core/lib/transport/service_config.c b/src/core/lib/transport/service_config.c index 12da2a88fe..1195f75044 100644 --- a/src/core/lib/transport/service_config.c +++ b/src/core/lib/transport/service_config.c @@ -93,6 +93,18 @@ void grpc_service_config_destroy(grpc_service_config* service_config) { gpr_free(service_config); } +void grpc_service_config_parse_global_params( + const grpc_service_config* service_config, + void (*process_json)(const grpc_json* json, void* arg), void* arg) { + const grpc_json* json = service_config->json_tree; + if (json->type != GRPC_JSON_OBJECT || json->key != NULL) return; + for (grpc_json* field = json->child; field != NULL; field = field->next) { + if (field->key == NULL) return; + if (strcmp(field->key, "methodConfig") == 0) continue; + process_json(field, arg); + } +} + const char* grpc_service_config_get_lb_policy_name( const grpc_service_config* service_config) { const grpc_json* json = service_config->json_tree; diff --git a/src/core/lib/transport/service_config.h b/src/core/lib/transport/service_config.h index cd739a593c..ebfc59b534 100644 --- a/src/core/lib/transport/service_config.h +++ b/src/core/lib/transport/service_config.h @@ -42,6 +42,12 @@ typedef struct grpc_service_config grpc_service_config; grpc_service_config* grpc_service_config_create(const char* json_string); void grpc_service_config_destroy(grpc_service_config* service_config); +/// Invokes \a process_json() for each global parameter in the service +/// config. \a arg is passed as the second argument to \a process_json(). +void grpc_service_config_parse_global_params( + const grpc_service_config* service_config, + void (*process_json)(const grpc_json* json, void* arg), void* arg); + /// Gets the LB policy name from \a service_config. /// Returns NULL if no LB policy name was specified. /// Caller does NOT take ownership. |