aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/client_config/http_connect_handshaker.c
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2016-08-04 13:41:46 -0700
committerGravatar Mark D. Roth <roth@google.com>2016-08-04 13:41:46 -0700
commit5b4768f6f407662135e95d1c696593c40af93d4d (patch)
tree583342aa7b1e3ac91987918de205180eeeec3fe5 /src/core/ext/client_config/http_connect_handshaker.c
parent0a05ab6e8e8e605f0fe4f2d0f4373d660bbdfe06 (diff)
Implement timeout support.
Diffstat (limited to 'src/core/ext/client_config/http_connect_handshaker.c')
-rw-r--r--src/core/ext/client_config/http_connect_handshaker.c139
1 files changed, 87 insertions, 52 deletions
diff --git a/src/core/ext/client_config/http_connect_handshaker.c b/src/core/ext/client_config/http_connect_handshaker.c
index 54f592ef61..82d84da580 100644
--- a/src/core/ext/client_config/http_connect_handshaker.c
+++ b/src/core/ext/client_config/http_connect_handshaker.c
@@ -42,6 +42,7 @@
#include "src/core/lib/http/format_request.h"
#include "src/core/lib/http/parser.h"
+#include "src/core/lib/iomgr/timer.h"
typedef struct http_connect_handshaker {
// Base class. Must be first.
@@ -63,51 +64,81 @@ typedef struct http_connect_handshaker {
grpc_closure response_read_closure;
grpc_http_parser http_parser;
grpc_http_response http_response;
+ grpc_timer timeout_timer;
+
+ gpr_refcount refcount;
} http_connect_handshaker;
+// Unref and clean up handshaker.
+static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) {
+ if (gpr_unref(&handshaker->refcount)) {
+ gpr_free(handshaker->proxy_server);
+ gpr_free(handshaker->server_name);
+ gpr_slice_buffer_destroy(&handshaker->write_buffer);
+ grpc_http_parser_destroy(&handshaker->http_parser);
+ grpc_http_response_destroy(&handshaker->http_response);
+ gpr_free(handshaker);
+ }
+}
+
+// Callback invoked when deadline is exceeded.
+static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ http_connect_handshaker* handshaker = arg;
+ if (error == GRPC_ERROR_NONE) // Timer fired, rather than being cancelled.
+ grpc_endpoint_shutdown(exec_ctx, handshaker->endpoint);
+ http_connect_handshaker_unref(handshaker);
+}
+
// Callback invoked when finished writing HTTP CONNECT request.
static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- http_connect_handshaker* h = arg;
+ http_connect_handshaker* handshaker = arg;
if (error != GRPC_ERROR_NONE) {
// If the write failed, invoke the callback immediately with the error.
- h->cb(exec_ctx, h->endpoint, h->args, h->read_buffer, h->user_data,
- GRPC_ERROR_REF(error));
+ handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args,
+ handshaker->read_buffer, handshaker->user_data,
+ GRPC_ERROR_REF(error));
} else {
// Otherwise, read the response.
- grpc_endpoint_read(exec_ctx, h->endpoint, h->read_buffer,
- &h->response_read_closure);
+ grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer,
+ &handshaker->response_read_closure);
}
}
// Callback invoked for reading HTTP CONNECT response.
static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- http_connect_handshaker* h = arg;
+ http_connect_handshaker* handshaker = arg;
if (error != GRPC_ERROR_NONE) {
GRPC_ERROR_REF(error); // Take ref to pass to the handshake-done callback.
goto done;
}
// Add buffer to parser.
- for (size_t i = 0; i < h->read_buffer->count; ++i) {
- if (GPR_SLICE_LENGTH(h->read_buffer->slices[i]) > 0) {
+ for (size_t i = 0; i < handshaker->read_buffer->count; ++i) {
+ if (GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i]) > 0) {
size_t body_start_offset = 0;
- error = grpc_http_parser_parse(&h->http_parser, h->read_buffer->slices[i],
+ error = grpc_http_parser_parse(&handshaker->http_parser,
+ handshaker->read_buffer->slices[i],
&body_start_offset);
if (error != GRPC_ERROR_NONE) goto done;
- if (h->http_parser.state == GRPC_HTTP_BODY) {
+ if (handshaker->http_parser.state == GRPC_HTTP_BODY) {
+ // We've gotten back a successul response, so stop the timeout timer.
+ grpc_timer_cancel(exec_ctx, &handshaker->timeout_timer);
// Remove the data we've already read from the read buffer,
// leaving only the leftover bytes (if any).
gpr_slice_buffer tmp_buffer;
gpr_slice_buffer_init(&tmp_buffer);
- if (body_start_offset < GPR_SLICE_LENGTH(h->read_buffer->slices[i])) {
- gpr_slice_buffer_add(&tmp_buffer,
- gpr_slice_split_tail(&h->read_buffer->slices[i],
- body_start_offset));
+ if (body_start_offset <
+ GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i])) {
+ gpr_slice_buffer_add(
+ &tmp_buffer,
+ gpr_slice_split_tail(&handshaker->read_buffer->slices[i],
+ body_start_offset));
}
- gpr_slice_buffer_addn(&tmp_buffer, &h->read_buffer->slices[i + 1],
- h->read_buffer->count - i - 1);
- gpr_slice_buffer_swap(h->read_buffer, &tmp_buffer);
+ gpr_slice_buffer_addn(&tmp_buffer,
+ &handshaker->read_buffer->slices[i + 1],
+ handshaker->read_buffer->count - i - 1);
+ gpr_slice_buffer_swap(handshaker->read_buffer, &tmp_buffer);
gpr_slice_buffer_destroy(&tmp_buffer);
break;
}
@@ -124,23 +155,25 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
// need to fix the HTTP parser to understand when the body is
// complete (e.g., handling chunked transfer encoding or looking
// at the Content-Length: header).
- if (h->http_parser.state != GRPC_HTTP_BODY) {
- gpr_slice_buffer_reset_and_unref(h->read_buffer);
- grpc_endpoint_read(exec_ctx, h->endpoint, h->read_buffer,
- &h->response_read_closure);
+ if (handshaker->http_parser.state != GRPC_HTTP_BODY) {
+ gpr_slice_buffer_reset_and_unref(handshaker->read_buffer);
+ grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer,
+ &handshaker->response_read_closure);
return;
}
// Make sure we got a 2xx response.
- if (h->http_response.status < 200 || h->http_response.status >= 300) {
+ if (handshaker->http_response.status < 200 ||
+ handshaker->http_response.status >= 300) {
char* msg;
gpr_asprintf(&msg, "HTTP proxy returned response code %d",
- h->http_response.status);
+ handshaker->http_response.status);
error = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
}
done:
// Invoke handshake-done callback.
- h->cb(exec_ctx, h->endpoint, h->args, h->read_buffer, h->user_data, error);
+ handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args,
+ handshaker->read_buffer, handshaker->user_data, error);
}
//
@@ -148,51 +181,45 @@ done:
//
static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx,
- grpc_handshaker* handshaker) {
- http_connect_handshaker* h = (http_connect_handshaker*)handshaker;
- gpr_free(h->proxy_server);
- gpr_free(h->server_name);
- gpr_slice_buffer_destroy(&h->write_buffer);
- grpc_http_parser_destroy(&h->http_parser);
- grpc_http_response_destroy(&h->http_response);
- gpr_free(h);
+ grpc_handshaker* handshaker_in) {
+ http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
+ http_connect_handshaker_unref(handshaker);
}
static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker) {}
-// FIXME BEFORE MERGING: apply deadline
static void http_connect_handshaker_do_handshake(
- grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker,
+ grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in,
grpc_endpoint* endpoint, grpc_channel_args* args,
gpr_slice_buffer* read_buffer, gpr_timespec deadline,
grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb,
void* user_data) {
- http_connect_handshaker* h = (http_connect_handshaker*)handshaker;
+ http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
// Save state in the handshaker object.
- h->endpoint = endpoint;
- h->args = args;
- h->cb = cb;
- h->user_data = user_data;
- // Initialize fields.
- gpr_slice_buffer_init(&h->write_buffer);
- h->read_buffer = read_buffer;
- grpc_closure_init(&h->request_done_closure, on_write_done, h);
- grpc_closure_init(&h->response_read_closure, on_read_done, h);
- grpc_http_parser_init(&h->http_parser, GRPC_HTTP_RESPONSE, &h->http_response);
+ handshaker->endpoint = endpoint;
+ handshaker->args = args;
+ handshaker->cb = cb;
+ handshaker->user_data = user_data;
+ handshaker->read_buffer = read_buffer;
// Send HTTP CONNECT request.
- gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s", h->server_name,
- h->proxy_server);
+ gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s",
+ handshaker->server_name, handshaker->proxy_server);
grpc_httpcli_request request;
memset(&request, 0, sizeof(request));
- request.host = h->proxy_server;
+ request.host = handshaker->proxy_server;
request.http.method = "CONNECT";
- request.http.path = h->server_name;
+ request.http.path = handshaker->server_name;
request.handshaker = &grpc_httpcli_plaintext;
gpr_slice request_slice = grpc_httpcli_format_connect_request(&request);
- gpr_slice_buffer_add(&h->write_buffer, request_slice);
- grpc_endpoint_write(exec_ctx, endpoint, &h->write_buffer,
- &h->request_done_closure);
+ gpr_slice_buffer_add(&handshaker->write_buffer, request_slice);
+ grpc_endpoint_write(exec_ctx, endpoint, &handshaker->write_buffer,
+ &handshaker->request_done_closure);
+ // Set timeout timer. The timer gets a reference to the handshaker.
+ gpr_ref(&handshaker->refcount);
+ grpc_timer_init(exec_ctx, &handshaker->timeout_timer,
+ gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
+ on_timeout, handshaker, gpr_now(GPR_CLOCK_MONOTONIC));
}
static const struct grpc_handshaker_vtable http_connect_handshaker_vtable = {
@@ -209,5 +236,13 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server,
grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base);
handshaker->proxy_server = gpr_strdup(proxy_server);
handshaker->server_name = gpr_strdup(server_name);
+ gpr_slice_buffer_init(&handshaker->write_buffer);
+ grpc_closure_init(&handshaker->request_done_closure, on_write_done,
+ handshaker);
+ grpc_closure_init(&handshaker->response_read_closure, on_read_done,
+ handshaker);
+ grpc_http_parser_init(&handshaker->http_parser, GRPC_HTTP_RESPONSE,
+ &handshaker->http_response);
+ gpr_ref_init(&handshaker->refcount, 1);
return (grpc_handshaker*)handshaker;
}