aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/client_channel/http_connect_handshaker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/client_channel/http_connect_handshaker.c')
-rw-r--r--src/core/ext/client_channel/http_connect_handshaker.c235
1 files changed, 153 insertions, 82 deletions
diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c
index ea2cbbdd97..76c78ee853 100644
--- a/src/core/ext/client_channel/http_connect_handshaker.c
+++ b/src/core/ext/client_channel/http_connect_handshaker.c
@@ -35,15 +35,17 @@
#include <string.h>
+#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/slice_buffer.h>
#include <grpc/support/string_util.h>
+#include "src/core/ext/client_channel/client_channel.h"
+#include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/ext/client_channel/uri_parser.h"
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/http/format_request.h"
#include "src/core/lib/http/parser.h"
-#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/env.h"
typedef struct http_connect_handshaker {
@@ -51,60 +53,105 @@ typedef struct http_connect_handshaker {
grpc_handshaker base;
char* proxy_server;
- char* server_name;
+
+ gpr_refcount refcount;
+ gpr_mu mu;
+
+ bool shutdown;
+ // Endpoint and read buffer to destroy after a shutdown.
+ grpc_endpoint* endpoint_to_destroy;
+ grpc_slice_buffer* read_buffer_to_destroy;
// State saved while performing the handshake.
- grpc_endpoint* endpoint;
- grpc_channel_args* args;
- grpc_handshaker_done_cb cb;
- void* user_data;
+ grpc_handshaker_args* args;
+ grpc_closure* on_handshake_done;
// Objects for processing the HTTP CONNECT request and response.
- gpr_slice_buffer write_buffer;
- gpr_slice_buffer* read_buffer; // Ownership passes through this object.
+ grpc_slice_buffer write_buffer;
grpc_closure request_done_closure;
grpc_closure response_read_closure;
grpc_http_parser http_parser;
grpc_http_response http_response;
- grpc_timer timeout_timer;
-
- gpr_refcount refcount;
} http_connect_handshaker;
// Unref and clean up handshaker.
-static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) {
+static void http_connect_handshaker_unref(grpc_exec_ctx* exec_ctx,
+ http_connect_handshaker* handshaker) {
if (gpr_unref(&handshaker->refcount)) {
+ gpr_mu_destroy(&handshaker->mu);
+ if (handshaker->endpoint_to_destroy != NULL) {
+ grpc_endpoint_destroy(exec_ctx, handshaker->endpoint_to_destroy);
+ }
+ if (handshaker->read_buffer_to_destroy != NULL) {
+ grpc_slice_buffer_destroy(handshaker->read_buffer_to_destroy);
+ gpr_free(handshaker->read_buffer_to_destroy);
+ }
gpr_free(handshaker->proxy_server);
- gpr_free(handshaker->server_name);
- gpr_slice_buffer_destroy(&handshaker->write_buffer);
+ grpc_slice_buffer_destroy(&handshaker->write_buffer);
grpc_http_parser_destroy(&handshaker->http_parser);
grpc_http_response_destroy(&handshaker->http_response);
gpr_free(handshaker);
}
}
-// Callback invoked when deadline is exceeded.
-static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
- http_connect_handshaker* handshaker = arg;
- if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled.
- grpc_endpoint_shutdown(exec_ctx, handshaker->endpoint);
+// Set args fields to NULL, saving the endpoint and read buffer for
+// later destruction.
+static void cleanup_args_for_failure_locked(
+ http_connect_handshaker* handshaker) {
+ handshaker->endpoint_to_destroy = handshaker->args->endpoint;
+ handshaker->args->endpoint = NULL;
+ handshaker->read_buffer_to_destroy = handshaker->args->read_buffer;
+ handshaker->args->read_buffer = NULL;
+ grpc_channel_args_destroy(handshaker->args->args);
+ handshaker->args->args = NULL;
+}
+
+// If the handshake failed or we're shutting down, clean up and invoke the
+// callback with the error.
+static void handshake_failed_locked(grpc_exec_ctx* exec_ctx,
+ http_connect_handshaker* handshaker,
+ grpc_error* error) {
+ if (error == GRPC_ERROR_NONE) {
+ // If we were shut down after an endpoint operation succeeded but
+ // before the endpoint callback was invoked, we need to generate our
+ // own error.
+ error = GRPC_ERROR_CREATE("Handshaker shutdown");
+ }
+ if (!handshaker->shutdown) {
+ // TODO(ctiller): It is currently necessary to shutdown endpoints
+ // before destroying them, even if we know that there are no
+ // pending read/write callbacks. This should be fixed, at which
+ // point this can be removed.
+ grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
+ // Not shutting down, so the handshake failed. Clean up before
+ // invoking the callback.
+ cleanup_args_for_failure_locked(handshaker);
+ // Set shutdown to true so that subsequent calls to
+ // http_connect_handshaker_shutdown() do nothing.
+ handshaker->shutdown = true;
}
- http_connect_handshaker_unref(handshaker);
+ // Invoke callback.
+ grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL);
}
// Callback invoked when finished writing HTTP CONNECT request.
static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
http_connect_handshaker* handshaker = arg;
- if (error != GRPC_ERROR_NONE) {
- // If the write failed, invoke the callback immediately with the error.
- handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args,
- handshaker->read_buffer, handshaker->user_data,
- GRPC_ERROR_REF(error));
+ gpr_mu_lock(&handshaker->mu);
+ if (error != GRPC_ERROR_NONE || handshaker->shutdown) {
+ // If the write failed or we're shutting down, clean up and invoke the
+ // callback with the error.
+ handshake_failed_locked(exec_ctx, handshaker, GRPC_ERROR_REF(error));
+ gpr_mu_unlock(&handshaker->mu);
+ http_connect_handshaker_unref(exec_ctx, handshaker);
} else {
// Otherwise, read the response.
- grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer,
+ // The read callback inherits our ref to the handshaker.
+ grpc_endpoint_read(exec_ctx, handshaker->args->endpoint,
+ handshaker->args->read_buffer,
&handshaker->response_read_closure);
+ gpr_mu_unlock(&handshaker->mu);
}
}
@@ -112,37 +159,41 @@ static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg,
static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
http_connect_handshaker* handshaker = arg;
- if (error != GRPC_ERROR_NONE) {
- GRPC_ERROR_REF(error); // Take ref to pass to the handshake-done callback.
+ gpr_mu_lock(&handshaker->mu);
+ if (error != GRPC_ERROR_NONE || handshaker->shutdown) {
+ // If the read failed or we're shutting down, clean up and invoke the
+ // callback with the error.
+ handshake_failed_locked(exec_ctx, handshaker, GRPC_ERROR_REF(error));
goto done;
}
// Add buffer to parser.
- for (size_t i = 0; i < handshaker->read_buffer->count; ++i) {
- if (GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i]) > 0) {
+ for (size_t i = 0; i < handshaker->args->read_buffer->count; ++i) {
+ if (GRPC_SLICE_LENGTH(handshaker->args->read_buffer->slices[i]) > 0) {
size_t body_start_offset = 0;
error = grpc_http_parser_parse(&handshaker->http_parser,
- handshaker->read_buffer->slices[i],
+ handshaker->args->read_buffer->slices[i],
&body_start_offset);
- if (error != GRPC_ERROR_NONE) goto done;
+ if (error != GRPC_ERROR_NONE) {
+ handshake_failed_locked(exec_ctx, handshaker, error);
+ goto done;
+ }
if (handshaker->http_parser.state == GRPC_HTTP_BODY) {
- // We've gotten back a successul response, so stop the timeout timer.
- grpc_timer_cancel(exec_ctx, &handshaker->timeout_timer);
// Remove the data we've already read from the read buffer,
// leaving only the leftover bytes (if any).
- gpr_slice_buffer tmp_buffer;
- gpr_slice_buffer_init(&tmp_buffer);
+ grpc_slice_buffer tmp_buffer;
+ grpc_slice_buffer_init(&tmp_buffer);
if (body_start_offset <
- GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i])) {
- gpr_slice_buffer_add(
+ GRPC_SLICE_LENGTH(handshaker->args->read_buffer->slices[i])) {
+ grpc_slice_buffer_add(
&tmp_buffer,
- gpr_slice_split_tail(&handshaker->read_buffer->slices[i],
- body_start_offset));
+ grpc_slice_split_tail(&handshaker->args->read_buffer->slices[i],
+ body_start_offset));
}
- gpr_slice_buffer_addn(&tmp_buffer,
- &handshaker->read_buffer->slices[i + 1],
- handshaker->read_buffer->count - i - 1);
- gpr_slice_buffer_swap(handshaker->read_buffer, &tmp_buffer);
- gpr_slice_buffer_destroy(&tmp_buffer);
+ grpc_slice_buffer_addn(&tmp_buffer,
+ &handshaker->args->read_buffer->slices[i + 1],
+ handshaker->args->read_buffer->count - i - 1);
+ grpc_slice_buffer_swap(handshaker->args->read_buffer, &tmp_buffer);
+ grpc_slice_buffer_destroy(&tmp_buffer);
break;
}
}
@@ -159,9 +210,11 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
// complete (e.g., handling chunked transfer encoding or looking
// at the Content-Length: header).
if (handshaker->http_parser.state != GRPC_HTTP_BODY) {
- gpr_slice_buffer_reset_and_unref(handshaker->read_buffer);
- grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer,
+ grpc_slice_buffer_reset_and_unref(handshaker->args->read_buffer);
+ grpc_endpoint_read(exec_ctx, handshaker->args->endpoint,
+ handshaker->args->read_buffer,
&handshaker->response_read_closure);
+ gpr_mu_unlock(&handshaker->mu);
return;
}
// Make sure we got a 2xx response.
@@ -172,11 +225,17 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
handshaker->http_response.status);
error = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
+ handshake_failed_locked(exec_ctx, handshaker, error);
+ goto done;
}
+ // Success. Invoke handshake-done callback.
+ grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL);
done:
- // Invoke handshake-done callback.
- handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args,
- handshaker->read_buffer, handshaker->user_data, error);
+ // Set shutdown to true so that subsequent calls to
+ // http_connect_handshaker_shutdown() do nothing.
+ handshaker->shutdown = true;
+ gpr_mu_unlock(&handshaker->mu);
+ http_connect_handshaker_unref(exec_ctx, handshaker);
}
//
@@ -186,67 +245,79 @@ done:
static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker_in) {
http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
- http_connect_handshaker_unref(handshaker);
+ http_connect_handshaker_unref(exec_ctx, handshaker);
}
static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
- grpc_handshaker* handshaker) {}
+ grpc_handshaker* handshaker_in) {
+ http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
+ gpr_mu_lock(&handshaker->mu);
+ if (!handshaker->shutdown) {
+ handshaker->shutdown = true;
+ grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
+ cleanup_args_for_failure_locked(handshaker);
+ }
+ gpr_mu_unlock(&handshaker->mu);
+}
static void http_connect_handshaker_do_handshake(
grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in,
- grpc_endpoint* endpoint, grpc_channel_args* args,
- gpr_slice_buffer* read_buffer, gpr_timespec deadline,
- grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb,
- void* user_data) {
+ grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done,
+ grpc_handshaker_args* args) {
http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
+ // Get server name from channel args.
+ const grpc_arg* arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
+ GPR_ASSERT(arg != NULL);
+ GPR_ASSERT(arg->type == GRPC_ARG_STRING);
+ char* canonical_uri =
+ grpc_resolver_factory_add_default_prefix_if_needed(arg->value.string);
+ grpc_uri* uri = grpc_uri_parse(canonical_uri, 1);
+ char* server_name = uri->path;
+ if (server_name[0] == '/') ++server_name;
// Save state in the handshaker object.
- handshaker->endpoint = endpoint;
+ gpr_mu_lock(&handshaker->mu);
handshaker->args = args;
- handshaker->cb = cb;
- handshaker->user_data = user_data;
- handshaker->read_buffer = read_buffer;
+ handshaker->on_handshake_done = on_handshake_done;
// Send HTTP CONNECT request.
- gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s",
- handshaker->server_name, handshaker->proxy_server);
+ gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s", server_name,
+ handshaker->proxy_server);
grpc_httpcli_request request;
memset(&request, 0, sizeof(request));
- request.host = handshaker->proxy_server;
+ request.host = server_name;
request.http.method = "CONNECT";
- request.http.path = handshaker->server_name;
+ request.http.path = server_name;
request.handshaker = &grpc_httpcli_plaintext;
- gpr_slice request_slice = grpc_httpcli_format_connect_request(&request);
- gpr_slice_buffer_add(&handshaker->write_buffer, request_slice);
- grpc_endpoint_write(exec_ctx, endpoint, &handshaker->write_buffer,
- &handshaker->request_done_closure);
- // Set timeout timer. The timer gets a reference to the handshaker.
+ grpc_slice request_slice = grpc_httpcli_format_connect_request(&request);
+ grpc_slice_buffer_add(&handshaker->write_buffer, request_slice);
+ // Take a new ref to be held by the write callback.
gpr_ref(&handshaker->refcount);
- grpc_timer_init(exec_ctx, &handshaker->timeout_timer,
- gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- on_timeout, handshaker, gpr_now(GPR_CLOCK_MONOTONIC));
+ grpc_endpoint_write(exec_ctx, args->endpoint, &handshaker->write_buffer,
+ &handshaker->request_done_closure);
+ gpr_mu_unlock(&handshaker->mu);
+ // Clean up.
+ gpr_free(canonical_uri);
+ grpc_uri_destroy(uri);
}
-static const struct grpc_handshaker_vtable http_connect_handshaker_vtable = {
+static const grpc_handshaker_vtable http_connect_handshaker_vtable = {
http_connect_handshaker_destroy, http_connect_handshaker_shutdown,
http_connect_handshaker_do_handshake};
-grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server,
- const char* server_name) {
+grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server) {
GPR_ASSERT(proxy_server != NULL);
- GPR_ASSERT(server_name != NULL);
- http_connect_handshaker* handshaker =
- gpr_malloc(sizeof(http_connect_handshaker));
+ http_connect_handshaker* handshaker = gpr_malloc(sizeof(*handshaker));
memset(handshaker, 0, sizeof(*handshaker));
grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base);
+ gpr_mu_init(&handshaker->mu);
+ gpr_ref_init(&handshaker->refcount, 1);
handshaker->proxy_server = gpr_strdup(proxy_server);
- handshaker->server_name = gpr_strdup(server_name);
- gpr_slice_buffer_init(&handshaker->write_buffer);
+ grpc_slice_buffer_init(&handshaker->write_buffer);
grpc_closure_init(&handshaker->request_done_closure, on_write_done,
handshaker);
grpc_closure_init(&handshaker->response_read_closure, on_read_done,
handshaker);
grpc_http_parser_init(&handshaker->http_parser, GRPC_HTTP_RESPONSE,
&handshaker->http_response);
- gpr_ref_init(&handshaker->refcount, 1);
return &handshaker->base;
}