diff options
author | 2016-11-14 12:08:13 -0800 | |
---|---|---|
committer | 2016-11-14 12:08:13 -0800 | |
commit | b16c1e32fd774ab38dc6cbf671c29f676905f889 (patch) | |
tree | d452714a6762e7afe3a694860c3c2575e78f695f /src/core/ext/client_channel/http_connect_handshaker.c | |
parent | 4b5cdb751ebda76b7cd78577f80b58a3bcf0105a (diff) |
Move deadline handling to handshake manager.
Diffstat (limited to 'src/core/ext/client_channel/http_connect_handshaker.c')
-rw-r--r-- | src/core/ext/client_channel/http_connect_handshaker.c | 59 |
1 files changed, 35 insertions, 24 deletions
diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c index a07efcc6d5..407b8ce023 100644 --- a/src/core/ext/client_channel/http_connect_handshaker.c +++ b/src/core/ext/client_channel/http_connect_handshaker.c @@ -43,7 +43,6 @@ #include "src/core/ext/client_channel/uri_parser.h" #include "src/core/lib/http/format_request.h" #include "src/core/lib/http/parser.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/support/env.h" typedef struct http_connect_handshaker { @@ -53,6 +52,9 @@ typedef struct http_connect_handshaker { char* proxy_server; char* server_name; + gpr_refcount refcount; + gpr_mu mu; + // State saved while performing the handshake. grpc_handshaker_args* args; grpc_closure* on_handshake_done; @@ -63,14 +65,12 @@ typedef struct http_connect_handshaker { grpc_closure response_read_closure; grpc_http_parser http_parser; grpc_http_response http_response; - grpc_timer timeout_timer; - - gpr_refcount refcount; } http_connect_handshaker; // Unref and clean up handshaker. static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) { if (gpr_unref(&handshaker->refcount)) { + gpr_mu_destroy(&handshaker->mu); gpr_free(handshaker->proxy_server); gpr_free(handshaker->server_name); grpc_slice_buffer_destroy(&handshaker->write_buffer); @@ -80,28 +80,27 @@ static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) { } } -// Callback invoked when deadline is exceeded. -static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - http_connect_handshaker* handshaker = arg; - if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled. - grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint); - } - http_connect_handshaker_unref(handshaker); -} - // Callback invoked when finished writing HTTP CONNECT request. static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { http_connect_handshaker* handshaker = arg; if (error != GRPC_ERROR_NONE) { // If the write failed, invoke the callback immediately with the error. + gpr_mu_lock(&handshaker->mu); grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, GRPC_ERROR_REF(error), NULL); + handshaker->args = NULL; + gpr_mu_unlock(&handshaker->mu); + http_connect_handshaker_unref(handshaker); } else { // Otherwise, read the response. + // The read callback inherits our ref to the handshaker. + gpr_mu_lock(&handshaker->mu); + GPR_ASSERT(handshaker->args != NULL); grpc_endpoint_read(exec_ctx, handshaker->args->endpoint, handshaker->args->read_buffer, &handshaker->response_read_closure); + gpr_mu_unlock(&handshaker->mu); } } @@ -109,6 +108,8 @@ static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg, static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { http_connect_handshaker* handshaker = arg; + gpr_mu_lock(&handshaker->mu); + GPR_ASSERT(handshaker->args != NULL); if (error != GRPC_ERROR_NONE) { GRPC_ERROR_REF(error); // Take ref to pass to the handshake-done callback. goto done; @@ -122,8 +123,6 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, &body_start_offset); if (error != GRPC_ERROR_NONE) goto done; if (handshaker->http_parser.state == GRPC_HTTP_BODY) { - // We've gotten back a successul response, so stop the timeout timer. - grpc_timer_cancel(exec_ctx, &handshaker->timeout_timer); // Remove the data we've already read from the read buffer, // leaving only the leftover bytes (if any). grpc_slice_buffer tmp_buffer; @@ -160,6 +159,7 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint_read(exec_ctx, handshaker->args->endpoint, handshaker->args->read_buffer, &handshaker->response_read_closure); + gpr_mu_unlock(&handshaker->mu); return; } // Make sure we got a 2xx response. @@ -173,7 +173,10 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, } done: // Invoke handshake-done callback. + handshaker->args = NULL; grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL); + gpr_mu_unlock(&handshaker->mu); + http_connect_handshaker_unref(handshaker); } // @@ -187,13 +190,22 @@ static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx, } static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker) {} + grpc_handshaker* handshaker_in) { + http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; + gpr_mu_lock(&handshaker->mu); + if (handshaker->args != NULL) { + grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint); + handshaker->args = NULL; + } + gpr_mu_unlock(&handshaker->mu); +} static void http_connect_handshaker_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in, - gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, - grpc_closure* on_handshake_done, grpc_handshaker_args* args) { + grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done, + grpc_handshaker_args* args) { http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; + gpr_mu_lock(&handshaker->mu); // Save state in the handshaker object. handshaker->args = args; handshaker->on_handshake_done = on_handshake_done; @@ -208,13 +220,11 @@ static void http_connect_handshaker_do_handshake( request.handshaker = &grpc_httpcli_plaintext; grpc_slice request_slice = grpc_httpcli_format_connect_request(&request); grpc_slice_buffer_add(&handshaker->write_buffer, request_slice); + // Take a new ref to be held by the write callback. + gpr_ref(&handshaker->refcount); grpc_endpoint_write(exec_ctx, args->endpoint, &handshaker->write_buffer, &handshaker->request_done_closure); - // Set timeout timer. The timer gets a reference to the handshaker. - gpr_ref(&handshaker->refcount); - grpc_timer_init(exec_ctx, &handshaker->timeout_timer, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - on_timeout, handshaker, gpr_now(GPR_CLOCK_MONOTONIC)); + gpr_mu_unlock(&handshaker->mu); } static const grpc_handshaker_vtable http_connect_handshaker_vtable = { @@ -228,6 +238,8 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, http_connect_handshaker* handshaker = gpr_malloc(sizeof(*handshaker)); memset(handshaker, 0, sizeof(*handshaker)); grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base); + gpr_mu_init(&handshaker->mu); + gpr_ref_init(&handshaker->refcount, 1); handshaker->proxy_server = gpr_strdup(proxy_server); handshaker->server_name = gpr_strdup(server_name); grpc_slice_buffer_init(&handshaker->write_buffer); @@ -237,7 +249,6 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, handshaker); grpc_http_parser_init(&handshaker->http_parser, GRPC_HTTP_RESPONSE, &handshaker->http_response); - gpr_ref_init(&handshaker->refcount, 1); return &handshaker->base; } |