diff options
author | Yash Tibrewal <yashkt@google.com> | 2017-12-06 09:47:49 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-06 09:47:49 -0800 |
commit | 1d4e99508409be052bd129ba507bae1fbe7eb7fa (patch) | |
tree | 6a657f8c6179d873b34505cdc24bce9462ca68eb /src/core/lib/http | |
parent | a3df36cc2505a89c2f481eea4a66a87b3002844a (diff) | |
parent | ad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (diff) |
Merge pull request #13658 from grpc/revert-13058-execctx
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'src/core/lib/http')
-rw-r--r-- | src/core/lib/http/httpcli.cc | 127 | ||||
-rw-r--r-- | src/core/lib/http/httpcli.h | 27 | ||||
-rw-r--r-- | src/core/lib/http/httpcli_security_connector.cc | 55 |
3 files changed, 118 insertions, 91 deletions
diff --git a/src/core/lib/http/httpcli.cc b/src/core/lib/http/httpcli.cc index ed874c4265..73b484b06d 100644 --- a/src/core/lib/http/httpcli.cc +++ b/src/core/lib/http/httpcli.cc @@ -63,11 +63,13 @@ typedef struct { static grpc_httpcli_get_override g_get_override = nullptr; static grpc_httpcli_post_override g_post_override = nullptr; -static void plaintext_handshake(void* arg, grpc_endpoint* endpoint, - const char* host, grpc_millis deadline, - void (*on_done)(void* arg, +static void plaintext_handshake(grpc_exec_ctx* exec_ctx, void* arg, + grpc_endpoint* endpoint, const char* host, + grpc_millis deadline, + void (*on_done)(grpc_exec_ctx* exec_ctx, + void* arg, grpc_endpoint* endpoint)) { - on_done(arg, endpoint); + on_done(exec_ctx, arg, endpoint); } const grpc_httpcli_handshaker grpc_httpcli_plaintext = {"http", @@ -77,31 +79,34 @@ void grpc_httpcli_context_init(grpc_httpcli_context* context) { context->pollset_set = grpc_pollset_set_create(); } -void grpc_httpcli_context_destroy(grpc_httpcli_context* context) { - grpc_pollset_set_destroy(context->pollset_set); +void grpc_httpcli_context_destroy(grpc_exec_ctx* exec_ctx, + grpc_httpcli_context* context) { + grpc_pollset_set_destroy(exec_ctx, context->pollset_set); } -static void next_address(internal_request* req, grpc_error* due_to_error); +static void next_address(grpc_exec_ctx* exec_ctx, internal_request* req, + grpc_error* due_to_error); -static void finish(internal_request* req, grpc_error* error) { - grpc_polling_entity_del_from_pollset_set(req->pollent, +static void finish(grpc_exec_ctx* exec_ctx, internal_request* req, + grpc_error* error) { + grpc_polling_entity_del_from_pollset_set(exec_ctx, req->pollent, req->context->pollset_set); - GRPC_CLOSURE_SCHED(req->on_done, error); + GRPC_CLOSURE_SCHED(exec_ctx, req->on_done, error); grpc_http_parser_destroy(&req->parser); if (req->addresses != nullptr) { grpc_resolved_addresses_destroy(req->addresses); } if (req->ep != nullptr) { - grpc_endpoint_destroy(req->ep); + grpc_endpoint_destroy(exec_ctx, req->ep); } - grpc_slice_unref_internal(req->request_text); + grpc_slice_unref_internal(exec_ctx, req->request_text); gpr_free(req->host); gpr_free(req->ssl_host_override); grpc_iomgr_unregister_object(&req->iomgr_obj); - grpc_slice_buffer_destroy_internal(&req->incoming); - grpc_slice_buffer_destroy_internal(&req->outgoing); + grpc_slice_buffer_destroy_internal(exec_ctx, &req->incoming); + grpc_slice_buffer_destroy_internal(exec_ctx, &req->outgoing); GRPC_ERROR_UNREF(req->overall_error); - grpc_resource_quota_unref_internal(req->resource_quota); + grpc_resource_quota_unref_internal(exec_ctx, req->resource_quota); gpr_free(req); } @@ -119,11 +124,12 @@ static void append_error(internal_request* req, grpc_error* error) { gpr_free(addr_text); } -static void do_read(internal_request* req) { - grpc_endpoint_read(req->ep, &req->incoming, &req->on_read); +static void do_read(grpc_exec_ctx* exec_ctx, internal_request* req) { + grpc_endpoint_read(exec_ctx, req->ep, &req->incoming, &req->on_read); } -static void on_read(void* user_data, grpc_error* error) { +static void on_read(grpc_exec_ctx* exec_ctx, void* user_data, + grpc_error* error) { internal_request* req = (internal_request*)user_data; size_t i; @@ -133,70 +139,77 @@ static void on_read(void* user_data, grpc_error* error) { grpc_error* err = grpc_http_parser_parse( &req->parser, req->incoming.slices[i], nullptr); if (err != GRPC_ERROR_NONE) { - finish(req, err); + finish(exec_ctx, req, err); return; } } } if (error == GRPC_ERROR_NONE) { - do_read(req); + do_read(exec_ctx, req); } else if (!req->have_read_byte) { - next_address(req, GRPC_ERROR_REF(error)); + next_address(exec_ctx, req, GRPC_ERROR_REF(error)); } else { - finish(req, grpc_http_parser_eof(&req->parser)); + finish(exec_ctx, req, grpc_http_parser_eof(&req->parser)); } } -static void on_written(internal_request* req) { do_read(req); } +static void on_written(grpc_exec_ctx* exec_ctx, internal_request* req) { + do_read(exec_ctx, req); +} -static void done_write(void* arg, grpc_error* error) { +static void done_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { internal_request* req = (internal_request*)arg; if (error == GRPC_ERROR_NONE) { - on_written(req); + on_written(exec_ctx, req); } else { - next_address(req, GRPC_ERROR_REF(error)); + next_address(exec_ctx, req, GRPC_ERROR_REF(error)); } } -static void start_write(internal_request* req) { +static void start_write(grpc_exec_ctx* exec_ctx, internal_request* req) { grpc_slice_ref_internal(req->request_text); grpc_slice_buffer_add(&req->outgoing, req->request_text); - grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write); + grpc_endpoint_write(exec_ctx, req->ep, &req->outgoing, &req->done_write); } -static void on_handshake_done(void* arg, grpc_endpoint* ep) { +static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_endpoint* ep) { internal_request* req = (internal_request*)arg; if (!ep) { - next_address(req, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Unexplained handshake failure")); + next_address( + exec_ctx, req, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexplained handshake failure")); return; } req->ep = ep; - start_write(req); + start_write(exec_ctx, req); } -static void on_connected(void* arg, grpc_error* error) { +static void on_connected(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { internal_request* req = (internal_request*)arg; if (!req->ep) { - next_address(req, GRPC_ERROR_REF(error)); + next_address(exec_ctx, req, GRPC_ERROR_REF(error)); return; } req->handshaker->handshake( - req, req->ep, req->ssl_host_override ? req->ssl_host_override : req->host, + exec_ctx, req, req->ep, + req->ssl_host_override ? req->ssl_host_override : req->host, req->deadline, on_handshake_done); } -static void next_address(internal_request* req, grpc_error* error) { +static void next_address(grpc_exec_ctx* exec_ctx, internal_request* req, + grpc_error* error) { grpc_resolved_address* addr; if (error != GRPC_ERROR_NONE) { append_error(req, error); } if (req->next_address == req->addresses->naddrs) { - finish(req, + finish(exec_ctx, req, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed HTTP requests to all targets", &req->overall_error, 1)); return; @@ -208,21 +221,23 @@ static void next_address(internal_request* req, grpc_error* error) { (char*)GRPC_ARG_RESOURCE_QUOTA, req->resource_quota, grpc_resource_quota_arg_vtable()); grpc_channel_args args = {1, &arg}; - grpc_tcp_client_connect(&req->connected, &req->ep, req->context->pollset_set, - &args, addr, req->deadline); + grpc_tcp_client_connect(exec_ctx, &req->connected, &req->ep, + req->context->pollset_set, &args, addr, + req->deadline); } -static void on_resolved(void* arg, grpc_error* error) { +static void on_resolved(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { internal_request* req = (internal_request*)arg; if (error != GRPC_ERROR_NONE) { - finish(req, GRPC_ERROR_REF(error)); + finish(exec_ctx, req, GRPC_ERROR_REF(error)); return; } req->next_address = 0; - next_address(req, GRPC_ERROR_NONE); + next_address(exec_ctx, req, GRPC_ERROR_NONE); } -static void internal_request_begin(grpc_httpcli_context* context, +static void internal_request_begin(grpc_exec_ctx* exec_ctx, + grpc_httpcli_context* context, grpc_polling_entity* pollent, grpc_resource_quota* resource_quota, const grpc_httpcli_request* request, @@ -252,31 +267,33 @@ static void internal_request_begin(grpc_httpcli_context* context, req->ssl_host_override = gpr_strdup(request->ssl_host_override); GPR_ASSERT(pollent); - grpc_polling_entity_add_to_pollset_set(req->pollent, + grpc_polling_entity_add_to_pollset_set(exec_ctx, req->pollent, req->context->pollset_set); grpc_resolve_address( - request->host, req->handshaker->default_port, req->context->pollset_set, + exec_ctx, request->host, req->handshaker->default_port, + req->context->pollset_set, GRPC_CLOSURE_CREATE(on_resolved, req, grpc_schedule_on_exec_ctx), &req->addresses); } -void grpc_httpcli_get(grpc_httpcli_context* context, +void grpc_httpcli_get(grpc_exec_ctx* exec_ctx, grpc_httpcli_context* context, grpc_polling_entity* pollent, grpc_resource_quota* resource_quota, const grpc_httpcli_request* request, grpc_millis deadline, grpc_closure* on_done, grpc_httpcli_response* response) { char* name; - if (g_get_override && g_get_override(request, deadline, on_done, response)) { + if (g_get_override && + g_get_override(exec_ctx, request, deadline, on_done, response)) { return; } gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->http.path); - internal_request_begin(context, pollent, resource_quota, request, deadline, - on_done, response, name, + internal_request_begin(exec_ctx, context, pollent, resource_quota, request, + deadline, on_done, response, name, grpc_httpcli_format_get_request(request)); gpr_free(name); } -void grpc_httpcli_post(grpc_httpcli_context* context, +void grpc_httpcli_post(grpc_exec_ctx* exec_ctx, grpc_httpcli_context* context, grpc_polling_entity* pollent, grpc_resource_quota* resource_quota, const grpc_httpcli_request* request, @@ -284,14 +301,16 @@ void grpc_httpcli_post(grpc_httpcli_context* context, grpc_millis deadline, grpc_closure* on_done, grpc_httpcli_response* response) { char* name; - if (g_post_override && g_post_override(request, body_bytes, body_size, - deadline, on_done, response)) { + if (g_post_override && + g_post_override(exec_ctx, request, body_bytes, body_size, deadline, + on_done, response)) { return; } gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->http.path); internal_request_begin( - context, pollent, resource_quota, request, deadline, on_done, response, - name, grpc_httpcli_format_post_request(request, body_bytes, body_size)); + exec_ctx, context, pollent, resource_quota, request, deadline, on_done, + response, name, + grpc_httpcli_format_post_request(request, body_bytes, body_size)); gpr_free(name); } diff --git a/src/core/lib/http/httpcli.h b/src/core/lib/http/httpcli.h index 72d20cc7a3..6f675568bd 100644 --- a/src/core/lib/http/httpcli.h +++ b/src/core/lib/http/httpcli.h @@ -41,9 +41,10 @@ typedef struct grpc_httpcli_context { typedef struct { const char* default_port; - void (*handshake)(void* arg, grpc_endpoint* endpoint, const char* host, - grpc_millis deadline, - void (*on_done)(void* arg, grpc_endpoint* endpoint)); + void (*handshake)(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* endpoint, + const char* host, grpc_millis deadline, + void (*on_done)(grpc_exec_ctx* exec_ctx, void* arg, + grpc_endpoint* endpoint)); } grpc_httpcli_handshaker; extern const grpc_httpcli_handshaker grpc_httpcli_plaintext; @@ -67,7 +68,8 @@ typedef struct grpc_httpcli_request { typedef struct grpc_http_response grpc_httpcli_response; void grpc_httpcli_context_init(grpc_httpcli_context* context); -void grpc_httpcli_context_destroy(grpc_httpcli_context* context); +void grpc_httpcli_context_destroy(grpc_exec_ctx* exec_ctx, + grpc_httpcli_context* context); /* Asynchronously perform a HTTP GET. 'context' specifies the http context under which to do the get @@ -78,7 +80,7 @@ void grpc_httpcli_context_destroy(grpc_httpcli_context* context); destroyed once the call returns 'deadline' contains a deadline for the request (or gpr_inf_future) 'on_response' is a callback to report results to */ -void grpc_httpcli_get(grpc_httpcli_context* context, +void grpc_httpcli_get(grpc_exec_ctx* exec_ctx, grpc_httpcli_context* context, grpc_polling_entity* pollent, grpc_resource_quota* resource_quota, const grpc_httpcli_request* request, grpc_millis deadline, @@ -99,7 +101,7 @@ void grpc_httpcli_get(grpc_httpcli_context* context, lifetime of the request 'on_response' is a callback to report results to Does not support ?var1=val1&var2=val2 in the path. */ -void grpc_httpcli_post(grpc_httpcli_context* context, +void grpc_httpcli_post(grpc_exec_ctx* exec_ctx, grpc_httpcli_context* context, grpc_polling_entity* pollent, grpc_resource_quota* resource_quota, const grpc_httpcli_request* request, @@ -108,16 +110,15 @@ void grpc_httpcli_post(grpc_httpcli_context* context, grpc_httpcli_response* response); /* override functions return 1 if they handled the request, 0 otherwise */ -typedef int (*grpc_httpcli_get_override)(const grpc_httpcli_request* request, +typedef int (*grpc_httpcli_get_override)(grpc_exec_ctx* exec_ctx, + const grpc_httpcli_request* request, grpc_millis deadline, grpc_closure* on_complete, grpc_httpcli_response* response); -typedef int (*grpc_httpcli_post_override)(const grpc_httpcli_request* request, - const char* body_bytes, - size_t body_size, - grpc_millis deadline, - grpc_closure* on_complete, - grpc_httpcli_response* response); +typedef int (*grpc_httpcli_post_override)( + grpc_exec_ctx* exec_ctx, const grpc_httpcli_request* request, + const char* body_bytes, size_t body_size, grpc_millis deadline, + grpc_closure* on_complete, grpc_httpcli_response* response); void grpc_httpcli_set_override(grpc_httpcli_get_override get, grpc_httpcli_post_override post); diff --git a/src/core/lib/http/httpcli_security_connector.cc b/src/core/lib/http/httpcli_security_connector.cc index bfb536a921..dfcaee702b 100644 --- a/src/core/lib/http/httpcli_security_connector.cc +++ b/src/core/lib/http/httpcli_security_connector.cc @@ -38,7 +38,8 @@ typedef struct { char* secure_peer_name; } grpc_httpcli_ssl_channel_security_connector; -static void httpcli_ssl_destroy(grpc_security_connector* sc) { +static void httpcli_ssl_destroy(grpc_exec_ctx* exec_ctx, + grpc_security_connector* sc) { grpc_httpcli_ssl_channel_security_connector* c = (grpc_httpcli_ssl_channel_security_connector*)sc; if (c->handshaker_factory != nullptr) { @@ -49,7 +50,8 @@ static void httpcli_ssl_destroy(grpc_security_connector* sc) { gpr_free(sc); } -static void httpcli_ssl_add_handshakers(grpc_channel_security_connector* sc, +static void httpcli_ssl_add_handshakers(grpc_exec_ctx* exec_ctx, + grpc_channel_security_connector* sc, grpc_handshake_manager* handshake_mgr) { grpc_httpcli_ssl_channel_security_connector* c = (grpc_httpcli_ssl_channel_security_connector*)sc; @@ -63,11 +65,13 @@ static void httpcli_ssl_add_handshakers(grpc_channel_security_connector* sc, } } grpc_handshake_manager_add( - handshake_mgr, grpc_security_handshaker_create( - tsi_create_adapter_handshaker(handshaker), &sc->base)); + handshake_mgr, + grpc_security_handshaker_create( + exec_ctx, tsi_create_adapter_handshaker(handshaker), &sc->base)); } -static void httpcli_ssl_check_peer(grpc_security_connector* sc, tsi_peer peer, +static void httpcli_ssl_check_peer(grpc_exec_ctx* exec_ctx, + grpc_security_connector* sc, tsi_peer peer, grpc_auth_context** auth_context, grpc_closure* on_peer_checked) { grpc_httpcli_ssl_channel_security_connector* c = @@ -83,7 +87,7 @@ static void httpcli_ssl_check_peer(grpc_security_connector* sc, tsi_peer peer, error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); } - GRPC_CLOSURE_SCHED(on_peer_checked, error); + GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, error); tsi_peer_destruct(&peer); } @@ -100,8 +104,8 @@ static grpc_security_connector_vtable httpcli_ssl_vtable = { httpcli_ssl_destroy, httpcli_ssl_check_peer, httpcli_ssl_cmp}; static grpc_security_status httpcli_ssl_channel_security_connector_create( - const char* pem_root_certs, const char* secure_peer_name, - grpc_channel_security_connector** sc) { + grpc_exec_ctx* exec_ctx, const char* pem_root_certs, + const char* secure_peer_name, grpc_channel_security_connector** sc) { tsi_result result = TSI_OK; grpc_httpcli_ssl_channel_security_connector* c; @@ -124,12 +128,12 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create( if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshaker factory creation failed with %s.", tsi_result_to_string(result)); - httpcli_ssl_destroy(&c->base.base); + httpcli_ssl_destroy(exec_ctx, &c->base.base); *sc = nullptr; return GRPC_SECURITY_ERROR; } // We don't actually need a channel credentials object in this case, - // but we set it to a non-nullptr address so that we don't trigger + // but we set it to a non-NULL address so that we don't trigger // assertions in grpc_channel_security_connector_cmp(). c->base.channel_creds = (grpc_channel_credentials*)1; c->base.add_handshakers = httpcli_ssl_add_handshakers; @@ -140,37 +144,40 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create( /* handshaker */ typedef struct { - void (*func)(void* arg, grpc_endpoint* endpoint); + void (*func)(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* endpoint); void* arg; grpc_handshake_manager* handshake_mgr; } on_done_closure; -static void on_handshake_done(void* arg, grpc_error* error) { +static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_handshaker_args* args = (grpc_handshaker_args*)arg; on_done_closure* c = (on_done_closure*)args->user_data; if (error != GRPC_ERROR_NONE) { const char* msg = grpc_error_string(error); gpr_log(GPR_ERROR, "Secure transport setup failed: %s", msg); - c->func(c->arg, nullptr); + c->func(exec_ctx, c->arg, nullptr); } else { - grpc_channel_args_destroy(args->args); - grpc_slice_buffer_destroy_internal(args->read_buffer); + grpc_channel_args_destroy(exec_ctx, args->args); + grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer); gpr_free(args->read_buffer); - c->func(c->arg, args->endpoint); + c->func(exec_ctx, c->arg, args->endpoint); } - grpc_handshake_manager_destroy(c->handshake_mgr); + grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); gpr_free(c); } -static void ssl_handshake(void* arg, grpc_endpoint* tcp, const char* host, +static void ssl_handshake(grpc_exec_ctx* exec_ctx, void* arg, + grpc_endpoint* tcp, const char* host, grpc_millis deadline, - void (*on_done)(void* arg, grpc_endpoint* endpoint)) { + void (*on_done)(grpc_exec_ctx* exec_ctx, void* arg, + grpc_endpoint* endpoint)) { on_done_closure* c = (on_done_closure*)gpr_malloc(sizeof(*c)); const char* pem_root_certs = grpc_get_default_ssl_roots(); if (pem_root_certs == nullptr) { gpr_log(GPR_ERROR, "Could not get default pem root certs."); - on_done(arg, nullptr); + on_done(exec_ctx, arg, nullptr); gpr_free(c); return; } @@ -178,16 +185,16 @@ static void ssl_handshake(void* arg, grpc_endpoint* tcp, const char* host, c->arg = arg; grpc_channel_security_connector* sc = nullptr; GPR_ASSERT(httpcli_ssl_channel_security_connector_create( - pem_root_certs, host, &sc) == GRPC_SECURITY_OK); + exec_ctx, pem_root_certs, host, &sc) == GRPC_SECURITY_OK); grpc_arg channel_arg = grpc_security_connector_to_arg(&sc->base); grpc_channel_args args = {1, &channel_arg}; c->handshake_mgr = grpc_handshake_manager_create(); - grpc_handshakers_add(HANDSHAKER_CLIENT, &args, c->handshake_mgr); + grpc_handshakers_add(exec_ctx, HANDSHAKER_CLIENT, &args, c->handshake_mgr); grpc_handshake_manager_do_handshake( - c->handshake_mgr, nullptr /* interested_parties */, tcp, + exec_ctx, c->handshake_mgr, nullptr /* interested_parties */, tcp, nullptr /* channel_args */, deadline, nullptr /* acceptor */, on_handshake_done, c /* user_data */); - GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "httpcli"); + GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, &sc->base, "httpcli"); } const grpc_httpcli_handshaker grpc_httpcli_ssl = {"https", ssl_handshake}; |