diff options
author | Mark D. Roth <roth@google.com> | 2016-08-04 13:41:46 -0700 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2016-08-04 13:41:46 -0700 |
commit | 5b4768f6f407662135e95d1c696593c40af93d4d (patch) | |
tree | 583342aa7b1e3ac91987918de205180eeeec3fe5 /src/core | |
parent | 0a05ab6e8e8e605f0fe4f2d0f4373d660bbdfe06 (diff) |
Implement timeout support.
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/ext/client_config/http_connect_handshaker.c | 139 |
1 files changed, 87 insertions, 52 deletions
diff --git a/src/core/ext/client_config/http_connect_handshaker.c b/src/core/ext/client_config/http_connect_handshaker.c index 54f592ef61..82d84da580 100644 --- a/src/core/ext/client_config/http_connect_handshaker.c +++ b/src/core/ext/client_config/http_connect_handshaker.c @@ -42,6 +42,7 @@ #include "src/core/lib/http/format_request.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/timer.h" typedef struct http_connect_handshaker { // Base class. Must be first. @@ -63,51 +64,81 @@ typedef struct http_connect_handshaker { grpc_closure response_read_closure; grpc_http_parser http_parser; grpc_http_response http_response; + grpc_timer timeout_timer; + + gpr_refcount refcount; } http_connect_handshaker; +// Unref and clean up handshaker. +static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) { + if (gpr_unref(&handshaker->refcount)) { + gpr_free(handshaker->proxy_server); + gpr_free(handshaker->server_name); + gpr_slice_buffer_destroy(&handshaker->write_buffer); + grpc_http_parser_destroy(&handshaker->http_parser); + grpc_http_response_destroy(&handshaker->http_response); + gpr_free(handshaker); + } +} + +// Callback invoked when deadline is exceeded. +static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + http_connect_handshaker* handshaker = arg; + if (error == GRPC_ERROR_NONE) // Timer fired, rather than being cancelled. + grpc_endpoint_shutdown(exec_ctx, handshaker->endpoint); + http_connect_handshaker_unref(handshaker); +} + // Callback invoked when finished writing HTTP CONNECT request. static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - http_connect_handshaker* h = arg; + http_connect_handshaker* handshaker = arg; if (error != GRPC_ERROR_NONE) { // If the write failed, invoke the callback immediately with the error. - h->cb(exec_ctx, h->endpoint, h->args, h->read_buffer, h->user_data, - GRPC_ERROR_REF(error)); + handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args, + handshaker->read_buffer, handshaker->user_data, + GRPC_ERROR_REF(error)); } else { // Otherwise, read the response. - grpc_endpoint_read(exec_ctx, h->endpoint, h->read_buffer, - &h->response_read_closure); + grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer, + &handshaker->response_read_closure); } } // Callback invoked for reading HTTP CONNECT response. static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - http_connect_handshaker* h = arg; + http_connect_handshaker* handshaker = arg; if (error != GRPC_ERROR_NONE) { GRPC_ERROR_REF(error); // Take ref to pass to the handshake-done callback. goto done; } // Add buffer to parser. - for (size_t i = 0; i < h->read_buffer->count; ++i) { - if (GPR_SLICE_LENGTH(h->read_buffer->slices[i]) > 0) { + for (size_t i = 0; i < handshaker->read_buffer->count; ++i) { + if (GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i]) > 0) { size_t body_start_offset = 0; - error = grpc_http_parser_parse(&h->http_parser, h->read_buffer->slices[i], + error = grpc_http_parser_parse(&handshaker->http_parser, + handshaker->read_buffer->slices[i], &body_start_offset); if (error != GRPC_ERROR_NONE) goto done; - if (h->http_parser.state == GRPC_HTTP_BODY) { + if (handshaker->http_parser.state == GRPC_HTTP_BODY) { + // We've gotten back a successul response, so stop the timeout timer. + grpc_timer_cancel(exec_ctx, &handshaker->timeout_timer); // Remove the data we've already read from the read buffer, // leaving only the leftover bytes (if any). gpr_slice_buffer tmp_buffer; gpr_slice_buffer_init(&tmp_buffer); - if (body_start_offset < GPR_SLICE_LENGTH(h->read_buffer->slices[i])) { - gpr_slice_buffer_add(&tmp_buffer, - gpr_slice_split_tail(&h->read_buffer->slices[i], - body_start_offset)); + if (body_start_offset < + GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i])) { + gpr_slice_buffer_add( + &tmp_buffer, + gpr_slice_split_tail(&handshaker->read_buffer->slices[i], + body_start_offset)); } - gpr_slice_buffer_addn(&tmp_buffer, &h->read_buffer->slices[i + 1], - h->read_buffer->count - i - 1); - gpr_slice_buffer_swap(h->read_buffer, &tmp_buffer); + gpr_slice_buffer_addn(&tmp_buffer, + &handshaker->read_buffer->slices[i + 1], + handshaker->read_buffer->count - i - 1); + gpr_slice_buffer_swap(handshaker->read_buffer, &tmp_buffer); gpr_slice_buffer_destroy(&tmp_buffer); break; } @@ -124,23 +155,25 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, // need to fix the HTTP parser to understand when the body is // complete (e.g., handling chunked transfer encoding or looking // at the Content-Length: header). - if (h->http_parser.state != GRPC_HTTP_BODY) { - gpr_slice_buffer_reset_and_unref(h->read_buffer); - grpc_endpoint_read(exec_ctx, h->endpoint, h->read_buffer, - &h->response_read_closure); + if (handshaker->http_parser.state != GRPC_HTTP_BODY) { + gpr_slice_buffer_reset_and_unref(handshaker->read_buffer); + grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer, + &handshaker->response_read_closure); return; } // Make sure we got a 2xx response. - if (h->http_response.status < 200 || h->http_response.status >= 300) { + if (handshaker->http_response.status < 200 || + handshaker->http_response.status >= 300) { char* msg; gpr_asprintf(&msg, "HTTP proxy returned response code %d", - h->http_response.status); + handshaker->http_response.status); error = GRPC_ERROR_CREATE(msg); gpr_free(msg); } done: // Invoke handshake-done callback. - h->cb(exec_ctx, h->endpoint, h->args, h->read_buffer, h->user_data, error); + handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args, + handshaker->read_buffer, handshaker->user_data, error); } // @@ -148,51 +181,45 @@ done: // static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker) { - http_connect_handshaker* h = (http_connect_handshaker*)handshaker; - gpr_free(h->proxy_server); - gpr_free(h->server_name); - gpr_slice_buffer_destroy(&h->write_buffer); - grpc_http_parser_destroy(&h->http_parser); - grpc_http_response_destroy(&h->http_response); - gpr_free(h); + grpc_handshaker* handshaker_in) { + http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; + http_connect_handshaker_unref(handshaker); } static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker) {} -// FIXME BEFORE MERGING: apply deadline static void http_connect_handshaker_do_handshake( - grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, + grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in, grpc_endpoint* endpoint, grpc_channel_args* args, gpr_slice_buffer* read_buffer, gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb, void* user_data) { - http_connect_handshaker* h = (http_connect_handshaker*)handshaker; + http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; // Save state in the handshaker object. - h->endpoint = endpoint; - h->args = args; - h->cb = cb; - h->user_data = user_data; - // Initialize fields. - gpr_slice_buffer_init(&h->write_buffer); - h->read_buffer = read_buffer; - grpc_closure_init(&h->request_done_closure, on_write_done, h); - grpc_closure_init(&h->response_read_closure, on_read_done, h); - grpc_http_parser_init(&h->http_parser, GRPC_HTTP_RESPONSE, &h->http_response); + handshaker->endpoint = endpoint; + handshaker->args = args; + handshaker->cb = cb; + handshaker->user_data = user_data; + handshaker->read_buffer = read_buffer; // Send HTTP CONNECT request. - gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s", h->server_name, - h->proxy_server); + gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s", + handshaker->server_name, handshaker->proxy_server); grpc_httpcli_request request; memset(&request, 0, sizeof(request)); - request.host = h->proxy_server; + request.host = handshaker->proxy_server; request.http.method = "CONNECT"; - request.http.path = h->server_name; + request.http.path = handshaker->server_name; request.handshaker = &grpc_httpcli_plaintext; gpr_slice request_slice = grpc_httpcli_format_connect_request(&request); - gpr_slice_buffer_add(&h->write_buffer, request_slice); - grpc_endpoint_write(exec_ctx, endpoint, &h->write_buffer, - &h->request_done_closure); + gpr_slice_buffer_add(&handshaker->write_buffer, request_slice); + grpc_endpoint_write(exec_ctx, endpoint, &handshaker->write_buffer, + &handshaker->request_done_closure); + // Set timeout timer. The timer gets a reference to the handshaker. + gpr_ref(&handshaker->refcount); + grpc_timer_init(exec_ctx, &handshaker->timeout_timer, + gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), + on_timeout, handshaker, gpr_now(GPR_CLOCK_MONOTONIC)); } static const struct grpc_handshaker_vtable http_connect_handshaker_vtable = { @@ -209,5 +236,13 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base); handshaker->proxy_server = gpr_strdup(proxy_server); handshaker->server_name = gpr_strdup(server_name); + gpr_slice_buffer_init(&handshaker->write_buffer); + grpc_closure_init(&handshaker->request_done_closure, on_write_done, + handshaker); + grpc_closure_init(&handshaker->response_read_closure, on_read_done, + handshaker); + grpc_http_parser_init(&handshaker->http_parser, GRPC_HTTP_RESPONSE, + &handshaker->http_response); + gpr_ref_init(&handshaker->refcount, 1); return (grpc_handshaker*)handshaker; } |