/* * * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following disclaimer * in the documentation and/or other materials provided with the * distribution. * * Neither the name of Google Inc. nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ #include #include #include #include #include "src/core/ext/client_channel/http_connect_handshaker.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/ext/client_channel/connector.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/security/transport/security_connector.h" typedef struct { grpc_connector base; gpr_mu mu; gpr_refcount refs; bool shutdown; grpc_closure *notify; grpc_connect_in_args args; grpc_connect_out_args *result; grpc_closure initial_string_sent; grpc_slice_buffer initial_string_buffer; grpc_endpoint *endpoint; // Non-NULL until handshaking starts. grpc_closure connected; grpc_handshake_manager *handshake_mgr; } chttp2_connector; static void chttp2_connector_ref(grpc_connector *con) { chttp2_connector *c = (chttp2_connector *)con; gpr_ref(&c->refs); } static void chttp2_connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { chttp2_connector *c = (chttp2_connector *)con; if (gpr_unref(&c->refs)) { /* c->initial_string_buffer does not need to be destroyed */ gpr_mu_destroy(&c->mu); grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); // If handshaking is not yet in progress, destroy the endpoint. // Otherwise, the handshaker will do this for us. if (c->endpoint != NULL) grpc_endpoint_destroy(exec_ctx, c->endpoint); gpr_free(c); } } static void chttp2_connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) { chttp2_connector *c = (chttp2_connector *)con; gpr_mu_lock(&c->mu); c->shutdown = true; grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr); // If handshaking is not yet in progress, shutdown the endpoint. // Otherwise, the handshaker will do this for us. if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint); gpr_mu_unlock(&c->mu); } static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_handshaker_args *args = arg; chttp2_connector *c = args->user_data; gpr_mu_lock(&c->mu); if (error != GRPC_ERROR_NONE || c->shutdown) { if (error == GRPC_ERROR_NONE) { error = GRPC_ERROR_CREATE("connector shutdown"); // We were shut down after handshaking completed successfully, so // destroy the endpoint here. // TODO(ctiller): It is currently necessary to shutdown endpoints // before destroying them, even if we know that there are no // pending read/write callbacks. This should be fixed, at which // point this can be removed. grpc_endpoint_shutdown(exec_ctx, args->endpoint); grpc_endpoint_destroy(exec_ctx, args->endpoint); grpc_channel_args_destroy(args->args); grpc_slice_buffer_destroy(args->read_buffer); gpr_free(args->read_buffer); } else { error = GRPC_ERROR_REF(error); } memset(c->result, 0, sizeof(*c->result)); } else { c->result->transport = grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 1); GPR_ASSERT(c->result->transport); grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, args->read_buffer); c->result->channel_args = args->args; } grpc_closure *notify = c->notify; c->notify = NULL; grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); gpr_mu_unlock(&c->mu); chttp2_connector_unref(exec_ctx, (grpc_connector*)c); } static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { chttp2_connector *c = arg; gpr_mu_lock(&c->mu); if (error != GRPC_ERROR_NONE || c->shutdown) { if (error == GRPC_ERROR_NONE) { error = GRPC_ERROR_CREATE("connector shutdown"); } else { error = GRPC_ERROR_REF(error); } memset(c->result, 0, sizeof(*c->result)); grpc_closure *notify = c->notify; c->notify = NULL; grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); gpr_mu_unlock(&c->mu); chttp2_connector_unref(exec_ctx, arg); } else { grpc_handshake_manager_do_handshake( exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args, c->args.deadline, NULL /* acceptor */, on_handshake_done, c); c->endpoint = NULL; // Endpoint handed off to handshake manager. gpr_mu_unlock(&c->mu); } } static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { chttp2_connector *c = arg; gpr_mu_lock(&c->mu); if (error != GRPC_ERROR_NONE || c->shutdown) { if (error == GRPC_ERROR_NONE) { error = GRPC_ERROR_CREATE("connector shutdown"); } else { error = GRPC_ERROR_REF(error); } memset(c->result, 0, sizeof(*c->result)); grpc_closure *notify = c->notify; c->notify = NULL; grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); gpr_mu_unlock(&c->mu); chttp2_connector_unref(exec_ctx, arg); } else { GPR_ASSERT(c->endpoint != NULL); if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) { grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, c); grpc_slice_buffer_init(&c->initial_string_buffer); grpc_slice_buffer_add(&c->initial_string_buffer, c->args.initial_connect_string); grpc_endpoint_write(exec_ctx, c->endpoint, &c->initial_string_buffer, &c->initial_string_sent); } else { grpc_handshake_manager_do_handshake( exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args, c->args.deadline, NULL /* acceptor */, on_handshake_done, c); c->endpoint = NULL; // Endpoint handed off to handshake manager. } gpr_mu_unlock(&c->mu); } } static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con, const grpc_connect_in_args *args, grpc_connect_out_args *result, grpc_closure *notify) { chttp2_connector *c = (chttp2_connector *)con; gpr_mu_lock(&c->mu); GPR_ASSERT(c->notify == NULL); c->notify = notify; c->args = *args; c->result = result; GPR_ASSERT(c->endpoint == NULL); chttp2_connector_ref(con); // Ref taken for callback. grpc_closure_init(&c->connected, connected, c); grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint, args->interested_parties, args->channel_args, args->addr, args->deadline); gpr_mu_unlock(&c->mu); } static const grpc_connector_vtable chttp2_connector_vtable = { chttp2_connector_ref, chttp2_connector_unref, chttp2_connector_shutdown, chttp2_connector_connect}; grpc_connector *grpc_chttp2_connector_create( grpc_exec_ctx *exec_ctx, const char* server_name, void (*create_handshakers)(grpc_exec_ctx* exec_ctx, void* user_data, grpc_handshake_manager* handshake_mgr), void* user_data) { chttp2_connector *c = gpr_malloc(sizeof(*c)); memset(c, 0, sizeof(*c)); c->base.vtable = &chttp2_connector_vtable; gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); c->handshake_mgr = grpc_handshake_manager_create(); char *proxy_name = grpc_get_http_proxy_server(); if (proxy_name != NULL) { grpc_handshake_manager_add( c->handshake_mgr, grpc_http_connect_handshaker_create(proxy_name, server_name)); gpr_free(proxy_name); } if (create_handshakers != NULL) { create_handshakers(exec_ctx, user_data, c->handshake_mgr); } return &c->base; }