From d1bec03fa148344b8eac2b59517252d86e4ca858 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 18 Sep 2015 17:29:00 -0700 Subject: Call list progress --- src/core/httpcli/httpcli.c | 127 +++++++++++++------------- src/core/httpcli/httpcli.h | 13 ++- src/core/httpcli/httpcli_security_connector.c | 19 ++-- 3 files changed, 81 insertions(+), 78 deletions(-) (limited to 'src/core/httpcli') diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index c484f4c7b3..e35fa20953 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -65,6 +65,7 @@ typedef struct { gpr_slice_buffer outgoing; grpc_closure on_read; grpc_closure done_write; + grpc_closure connected; grpc_workqueue *workqueue; } internal_request; @@ -74,8 +75,10 @@ static grpc_httpcli_post_override g_post_override = NULL; static void plaintext_handshake(void *arg, grpc_endpoint *endpoint, const char *host, void (*on_done)(void *arg, - grpc_endpoint *endpoint)) { - on_done(arg, endpoint); + grpc_endpoint *endpoint, + grpc_call_list *call_list), + grpc_call_list *call_list) { + on_done(arg, endpoint, call_list); } const grpc_httpcli_handshaker grpc_httpcli_plaintext = {"http", @@ -89,17 +92,19 @@ void grpc_httpcli_context_destroy(grpc_httpcli_context *context) { grpc_pollset_set_destroy(&context->pollset_set); } -static void next_address(internal_request *req); +static void next_address(internal_request *req, grpc_call_list *call_list); -static void finish(internal_request *req, int success) { - grpc_pollset_set_del_pollset(&req->context->pollset_set, req->pollset); - req->on_response(req->user_data, success ? &req->parser.r : NULL); +static void finish(internal_request *req, int success, + grpc_call_list *call_list) { + grpc_pollset_set_del_pollset(&req->context->pollset_set, req->pollset, + call_list); + req->on_response(req->user_data, success ? &req->parser.r : NULL, call_list); grpc_httpcli_parser_destroy(&req->parser); if (req->addresses != NULL) { grpc_resolved_addresses_destroy(req->addresses); } if (req->ep != NULL) { - grpc_endpoint_destroy(req->ep); + grpc_endpoint_destroy(req->ep, call_list); } gpr_slice_unref(req->request_text); gpr_free(req->host); @@ -110,22 +115,13 @@ static void finish(internal_request *req, int success) { gpr_free(req); } -static void on_read(void *user_data, int success); +static void on_read(void *user_data, int success, grpc_call_list *call_list); -static void do_read(internal_request *req) { - switch (grpc_endpoint_read(req->ep, &req->incoming, &req->on_read)) { - case GRPC_ENDPOINT_DONE: - on_read(req, 1); - break; - case GRPC_ENDPOINT_PENDING: - break; - case GRPC_ENDPOINT_ERROR: - on_read(req, 0); - break; - } +static void do_read(internal_request *req, grpc_call_list *call_list) { + grpc_endpoint_read(req->ep, &req->incoming, &req->on_read, call_list); } -static void on_read(void *user_data, int success) { +static void on_read(void *user_data, int success, grpc_call_list *call_list) { internal_request *req = user_data; size_t i; @@ -133,99 +129,94 @@ static void on_read(void *user_data, int success) { if (GPR_SLICE_LENGTH(req->incoming.slices[i])) { req->have_read_byte = 1; if (!grpc_httpcli_parser_parse(&req->parser, req->incoming.slices[i])) { - finish(req, 0); + finish(req, 0, call_list); return; } } } if (success) { - do_read(req); + do_read(req, call_list); } else if (!req->have_read_byte) { - next_address(req); + next_address(req, call_list); } else { - finish(req, grpc_httpcli_parser_eof(&req->parser)); + finish(req, grpc_httpcli_parser_eof(&req->parser), call_list); } } -static void on_written(internal_request *req) { do_read(req); } +static void on_written(internal_request *req, grpc_call_list *call_list) { + do_read(req, call_list); +} -static void done_write(void *arg, int success) { +static void done_write(void *arg, int success, grpc_call_list *call_list) { internal_request *req = arg; if (success) { - on_written(req); + on_written(req, call_list); } else { - next_address(req); + next_address(req, call_list); } } -static void start_write(internal_request *req) { +static void start_write(internal_request *req, grpc_call_list *call_list) { gpr_slice_ref(req->request_text); gpr_slice_buffer_add(&req->outgoing, req->request_text); - switch (grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write)) { - case GRPC_ENDPOINT_DONE: - on_written(req); - break; - case GRPC_ENDPOINT_PENDING: - break; - case GRPC_ENDPOINT_ERROR: - finish(req, 0); - break; - } + grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write, call_list); } -static void on_handshake_done(void *arg, grpc_endpoint *ep) { +static void on_handshake_done(void *arg, grpc_endpoint *ep, + grpc_call_list *call_list) { internal_request *req = arg; if (!ep) { - next_address(req); + next_address(req, call_list); return; } req->ep = ep; - start_write(req); + start_write(req, call_list); } -static void on_connected(void *arg, grpc_endpoint *tcp) { +static void on_connected(void *arg, int success, grpc_call_list *call_list) { internal_request *req = arg; - if (!tcp) { - next_address(req); + if (!req->ep) { + next_address(req, call_list); return; } - req->handshaker->handshake(req, tcp, req->host, on_handshake_done); + req->handshaker->handshake(req, req->ep, req->host, on_handshake_done, + call_list); } -static void next_address(internal_request *req) { +static void next_address(internal_request *req, grpc_call_list *call_list) { grpc_resolved_address *addr; if (req->next_address == req->addresses->naddrs) { - finish(req, 0); + finish(req, 0, call_list); return; } addr = &req->addresses->addrs[req->next_address++]; - grpc_tcp_client_connect(on_connected, req, &req->context->pollset_set, + grpc_closure_init(&req->connected, on_connected, req); + grpc_tcp_client_connect(&req->connected, &req->ep, &req->context->pollset_set, req->workqueue, (struct sockaddr *)&addr->addr, - addr->len, req->deadline); + addr->len, req->deadline, call_list); } -static void on_resolved(void *arg, grpc_resolved_addresses *addresses) { +static void on_resolved(void *arg, grpc_resolved_addresses *addresses, + grpc_call_list *call_list) { internal_request *req = arg; if (!addresses) { - finish(req, 0); + finish(req, 0, call_list); return; } req->addresses = addresses; req->next_address = 0; - next_address(req); + next_address(req, call_list); } -static void internal_request_begin(grpc_httpcli_context *context, - grpc_pollset *pollset, - const grpc_httpcli_request *request, - gpr_timespec deadline, - grpc_httpcli_response_cb on_response, - void *user_data, const char *name, - gpr_slice request_text) { +static void internal_request_begin( + grpc_httpcli_context *context, grpc_pollset *pollset, + const grpc_httpcli_request *request, gpr_timespec deadline, + grpc_httpcli_response_cb on_response, void *user_data, const char *name, + gpr_slice request_text, grpc_call_list *call_list) { internal_request *req = gpr_malloc(sizeof(internal_request)); memset(req, 0, sizeof(*req)); req->request_text = request_text; @@ -243,10 +234,11 @@ static void internal_request_begin(grpc_httpcli_context *context, gpr_slice_buffer_init(&req->outgoing); grpc_iomgr_register_object(&req->iomgr_obj, name); req->host = gpr_strdup(request->host); - req->workqueue = grpc_workqueue_create(); + req->workqueue = grpc_workqueue_create(call_list); grpc_workqueue_add_to_pollset(req->workqueue, pollset); - grpc_pollset_set_add_pollset(&req->context->pollset_set, req->pollset); + grpc_pollset_set_add_pollset(&req->context->pollset_set, req->pollset, + call_list); grpc_resolve_address(request->host, req->handshaker->default_port, on_resolved, req); } @@ -254,7 +246,8 @@ static void internal_request_begin(grpc_httpcli_context *context, void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset, const grpc_httpcli_request *request, gpr_timespec deadline, - grpc_httpcli_response_cb on_response, void *user_data) { + grpc_httpcli_response_cb on_response, void *user_data, + grpc_call_list *call_list) { char *name; if (g_get_override && g_get_override(request, deadline, on_response, user_data)) { @@ -263,7 +256,7 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset, gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path); internal_request_begin(context, pollset, request, deadline, on_response, user_data, name, - grpc_httpcli_format_get_request(request)); + grpc_httpcli_format_get_request(request), call_list); gpr_free(name); } @@ -271,7 +264,8 @@ void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset, const grpc_httpcli_request *request, const char *body_bytes, size_t body_size, gpr_timespec deadline, - grpc_httpcli_response_cb on_response, void *user_data) { + grpc_httpcli_response_cb on_response, void *user_data, + grpc_call_list *call_list) { char *name; if (g_post_override && g_post_override(request, body_bytes, body_size, deadline, on_response, user_data)) { @@ -280,7 +274,8 @@ void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset, gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->path); internal_request_begin( context, pollset, request, deadline, on_response, user_data, name, - grpc_httpcli_format_post_request(request, body_bytes, body_size)); + grpc_httpcli_format_post_request(request, body_bytes, body_size), + call_list); gpr_free(name); } diff --git a/src/core/httpcli/httpcli.h b/src/core/httpcli/httpcli.h index c45966714c..74bb123042 100644 --- a/src/core/httpcli/httpcli.h +++ b/src/core/httpcli/httpcli.h @@ -62,7 +62,9 @@ typedef struct grpc_httpcli_context { typedef struct { const char *default_port; void (*handshake)(void *arg, grpc_endpoint *endpoint, const char *host, - void (*on_done)(void *arg, grpc_endpoint *endpoint)); + void (*on_done)(void *arg, grpc_endpoint *endpoint, + grpc_call_list *call_list), + grpc_call_list *call_list); } grpc_httpcli_handshaker; extern const grpc_httpcli_handshaker grpc_httpcli_plaintext; @@ -97,7 +99,8 @@ typedef struct grpc_httpcli_response { /* Callback for grpc_httpcli_get and grpc_httpcli_post. */ typedef void (*grpc_httpcli_response_cb)(void *user_data, - const grpc_httpcli_response *response); + const grpc_httpcli_response *response, + grpc_call_list *call_list); void grpc_httpcli_context_init(grpc_httpcli_context *context); void grpc_httpcli_context_destroy(grpc_httpcli_context *context); @@ -115,7 +118,8 @@ void grpc_httpcli_context_destroy(grpc_httpcli_context *context); void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset, const grpc_httpcli_request *request, gpr_timespec deadline, - grpc_httpcli_response_cb on_response, void *user_data); + grpc_httpcli_response_cb on_response, void *user_data, + grpc_call_list *call_list); /* Asynchronously perform a HTTP POST. 'context' specifies the http context under which to do the post @@ -136,7 +140,8 @@ void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset, const grpc_httpcli_request *request, const char *body_bytes, size_t body_size, gpr_timespec deadline, - grpc_httpcli_response_cb on_response, void *user_data); + grpc_httpcli_response_cb on_response, void *user_data, + grpc_call_list *call_list); /* override functions return 1 if they handled the request, 0 otherwise */ typedef int (*grpc_httpcli_get_override)(const grpc_httpcli_request *request, diff --git a/src/core/httpcli/httpcli_security_connector.c b/src/core/httpcli/httpcli_security_connector.c index 7887f9d530..a11fcd3e62 100644 --- a/src/core/httpcli/httpcli_security_connector.c +++ b/src/core/httpcli/httpcli_security_connector.c @@ -134,33 +134,36 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create( /* handshaker */ typedef struct { - void (*func)(void *arg, grpc_endpoint *endpoint); + void (*func)(void *arg, grpc_endpoint *endpoint, grpc_call_list *call_list); void *arg; } on_done_closure; static void on_secure_transport_setup_done(void *rp, grpc_security_status status, grpc_endpoint *wrapped_endpoint, - grpc_endpoint *secure_endpoint) { + grpc_endpoint *secure_endpoint, + grpc_call_list *call_list) { on_done_closure *c = rp; if (status != GRPC_SECURITY_OK) { gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status); - c->func(c->arg, NULL); + c->func(c->arg, NULL, call_list); } else { - c->func(c->arg, secure_endpoint); + c->func(c->arg, secure_endpoint, call_list); } gpr_free(c); } static void ssl_handshake(void *arg, grpc_endpoint *tcp, const char *host, - void (*on_done)(void *arg, grpc_endpoint *endpoint)) { + void (*on_done)(void *arg, grpc_endpoint *endpoint, + grpc_call_list *call_list), + grpc_call_list *call_list) { grpc_channel_security_connector *sc = NULL; const unsigned char *pem_root_certs = NULL; on_done_closure *c = gpr_malloc(sizeof(*c)); size_t pem_root_certs_size = grpc_get_default_ssl_roots(&pem_root_certs); if (pem_root_certs == NULL || pem_root_certs_size == 0) { gpr_log(GPR_ERROR, "Could not get default pem root certs."); - on_done(arg, NULL); + on_done(arg, NULL, call_list); gpr_free(c); return; } @@ -169,8 +172,8 @@ static void ssl_handshake(void *arg, grpc_endpoint *tcp, const char *host, GPR_ASSERT(httpcli_ssl_channel_security_connector_create( pem_root_certs, pem_root_certs_size, host, &sc) == GRPC_SECURITY_OK); - grpc_setup_secure_transport(&sc->base, tcp, on_secure_transport_setup_done, - c); + grpc_setup_secure_transport(&sc->base, tcp, on_secure_transport_setup_done, c, + call_list); GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "httpcli"); } -- cgit v1.2.3