From 3cfc5a7b1d8c428d50c9be537b47ad543bff3a40 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 27 Jul 2016 07:48:39 -0700 Subject: Made significant progress on the test, but not working yet. --- test/core/end2end/fixtures/h2_http_proxy.c | 21 +- test/core/end2end/fixtures/http_proxy.c | 351 +++++++++++++++++++++++++++++ test/core/end2end/fixtures/http_proxy.h | 41 ++++ 3 files changed, 404 insertions(+), 9 deletions(-) create mode 100644 test/core/end2end/fixtures/http_proxy.c create mode 100644 test/core/end2end/fixtures/http_proxy.h (limited to 'test/core') diff --git a/test/core/end2end/fixtures/h2_http_proxy.c b/test/core/end2end/fixtures/h2_http_proxy.c index 4578d753f6..d84f0b8cb9 100644 --- a/test/core/end2end/fixtures/h2_http_proxy.c +++ b/test/core/end2end/fixtures/h2_http_proxy.c @@ -48,21 +48,24 @@ #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/server.h" +#include "test/core/end2end/fixtures/http_proxy.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" typedef struct fullstack_fixture_data { - char *localaddr; + char *server_addr; + grpc_end2end_http_proxy *proxy; } fullstack_fixture_data; static grpc_end2end_test_fixture chttp2_create_fixture_fullstack( grpc_channel_args *client_args, grpc_channel_args *server_args) { grpc_end2end_test_fixture f; - int port = grpc_pick_unused_port_or_die(); - fullstack_fixture_data *ffd = gpr_malloc(sizeof(fullstack_fixture_data)); memset(&f, 0, sizeof(f)); - gpr_join_host_port(&ffd->localaddr, "localhost", port); + fullstack_fixture_data *ffd = gpr_malloc(sizeof(fullstack_fixture_data)); + const int server_port = grpc_pick_unused_port_or_die(); + gpr_join_host_port(&ffd->server_addr, "localhost", server_port); + ffd->proxy = grpc_end2end_http_proxy_create(); f.fixture_data = ffd; f.cq = grpc_completion_queue_create(NULL); @@ -73,10 +76,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_fullstack( void chttp2_init_client_fullstack(grpc_end2end_test_fixture *f, grpc_channel_args *client_args) { fullstack_fixture_data *ffd = f->fixture_data; -// FIXME: this requires a separate proxy running at localhost:9999. need to -// change this test to provide its own proxy. char *target_uri; - gpr_asprintf(&target_uri, "%s?http_proxy=127.0.0.1:9999", ffd->localaddr); + gpr_asprintf(&target_uri, "%s?http_proxy=%s", ffd->server_addr, + grpc_end2end_http_proxy_get_proxy_name(ffd->proxy)); gpr_log(GPR_INFO, "target_uri: %s", target_uri); f->client = grpc_insecure_channel_create(target_uri, client_args, NULL); gpr_free(target_uri); @@ -91,13 +93,14 @@ void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f, } f->server = grpc_server_create(server_args, NULL); grpc_server_register_completion_queue(f->server, f->cq, NULL); - GPR_ASSERT(grpc_server_add_insecure_http2_port(f->server, ffd->localaddr)); + GPR_ASSERT(grpc_server_add_insecure_http2_port(f->server, ffd->server_addr)); grpc_server_start(f->server); } void chttp2_tear_down_fullstack(grpc_end2end_test_fixture *f) { fullstack_fixture_data *ffd = f->fixture_data; - gpr_free(ffd->localaddr); + gpr_free(ffd->server_addr); + grpc_end2end_http_proxy_destroy(ffd->proxy); gpr_free(ffd); } diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c new file mode 100644 index 0000000000..a8d68f0249 --- /dev/null +++ b/test/core/end2end/fixtures/http_proxy.c @@ -0,0 +1,351 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/core/end2end/fixtures/http_proxy.h" + +#include + +#include +#include +#include +#include +#include +#include +//#include +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/pollset.h" +#include "src/core/lib/iomgr/pollset_set.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/iomgr/tcp_server.h" +#include "test/core/util/port.h" + +// +// Connection handling +// + +typedef struct connection_data { + grpc_endpoint* client_endpoint; + grpc_endpoint* server_endpoint; + + grpc_pollset_set* pollset_set; + + grpc_closure on_read_request_done; + grpc_closure on_server_connect_done; + grpc_closure on_write_response_done; + grpc_closure on_client_read_done; + grpc_closure on_client_write_done; + grpc_closure on_server_read_done; + grpc_closure on_server_write_done; + + gpr_slice_buffer client_read_buffer; + gpr_slice_buffer client_write_buffer; + gpr_slice_buffer server_read_buffer; + gpr_slice_buffer server_write_buffer; + + grpc_http_parser http_parser; + grpc_http_request http_request; +} connection_data; + +static void connection_data_destroy(grpc_exec_ctx* exec_ctx, + connection_data* cd) { + grpc_endpoint_destroy(exec_ctx, cd->client_endpoint); + if (cd->server_endpoint != NULL) + grpc_endpoint_destroy(exec_ctx, cd->server_endpoint); + grpc_pollset_set_destroy(cd->pollset_set); + gpr_slice_buffer_destroy(&cd->client_read_buffer); + gpr_slice_buffer_destroy(&cd->client_write_buffer); + gpr_slice_buffer_destroy(&cd->server_read_buffer); + gpr_slice_buffer_destroy(&cd->server_write_buffer); + grpc_http_parser_destroy(&cd->http_parser); + grpc_http_request_destroy(&cd->http_request); + gpr_free(cd); +} + +static void connection_data_failed(grpc_exec_ctx* exec_ctx, + connection_data* cd, const char* prefix, + grpc_error* error) { + const char* msg = grpc_error_string(error); + gpr_log(GPR_ERROR, "%s: %s", prefix, msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(error); + grpc_endpoint_shutdown(exec_ctx, cd->client_endpoint); + if (cd->server_endpoint != NULL) + grpc_endpoint_shutdown(exec_ctx, cd->server_endpoint); + connection_data_destroy(exec_ctx, cd); +} + +static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + connection_data* cd = arg; + if (error != GRPC_ERROR_NONE) + connection_data_failed(exec_ctx, cd, "HTTP proxy client write", error); +} + +static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + connection_data* cd = arg; + if (error != GRPC_ERROR_NONE) + connection_data_failed(exec_ctx, cd, "HTTP proxy server write", error); +} + +static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + connection_data* cd = arg; + if (error != GRPC_ERROR_NONE) { + connection_data_failed(exec_ctx, cd, "HTTP proxy client read", error); + return; + } + gpr_slice_buffer_move_into(&cd->client_read_buffer, &cd->server_write_buffer); + grpc_endpoint_write(exec_ctx, cd->server_endpoint, &cd->server_write_buffer, + &cd->on_server_write_done); +} + +static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + connection_data* cd = arg; + if (error != GRPC_ERROR_NONE) { + connection_data_failed(exec_ctx, cd, "HTTP proxy server read", error); + return; + } + gpr_slice_buffer_move_into(&cd->server_read_buffer, &cd->client_write_buffer); + grpc_endpoint_write(exec_ctx, cd->client_endpoint, &cd->client_write_buffer, + &cd->on_client_write_done); +} + +static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + connection_data* cd = arg; + if (error != GRPC_ERROR_NONE) { + connection_data_failed(exec_ctx, cd, "HTTP proxy write response", error); + return; + } + // Set up proxying. + grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, + &cd->on_client_read_done); + grpc_endpoint_read(exec_ctx, cd->server_endpoint, &cd->server_read_buffer, + &cd->on_server_read_done); +} + +static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + connection_data* cd = arg; + if (error != GRPC_ERROR_NONE) { + connection_data_failed(exec_ctx, cd, "HTTP proxy server connect", error); + return; + } + // We've established a connection, so send back a 200 response code to + // the client. + gpr_slice slice = gpr_slice_from_copied_string("200 connected\r\n"); + gpr_slice_buffer_reset_and_unref(&cd->client_write_buffer); + gpr_slice_buffer_add(&cd->client_write_buffer, slice); + grpc_endpoint_write(exec_ctx, cd->client_endpoint, &cd->client_write_buffer, + &cd->on_write_response_done); +} + +static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + connection_data* cd = arg; + if (error != GRPC_ERROR_NONE) { + connection_data_failed(exec_ctx, cd, "HTTP proxy read request", error); + return; + } + // Read request and feed it to the parser. + for (size_t i = 0; i < cd->client_read_buffer.count; ++i) { + if (GPR_SLICE_LENGTH(cd->client_read_buffer.slices[i]) > 0) { + error = grpc_http_parser_parse( + &cd->http_parser, cd->client_read_buffer.slices[i]); + if (error != GRPC_ERROR_NONE) { + connection_data_failed(exec_ctx, cd, "HTTP proxy request parse", + error); + return; + } + } + } + gpr_slice_buffer_reset_and_unref(&cd->client_read_buffer); + // If we're not done reading the request, read more data. + if (cd->http_parser.state != GRPC_HTTP_BODY) { + grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, + &cd->on_read_request_done); + return; + } + // Make sure we got a CONNECT request. + if (strcmp(cd->http_request.method, "CONNECT") != 0) { + char* msg; + gpr_asprintf(&msg, "HTTP proxy got request method %s", + cd->http_request.method); + error = GRPC_ERROR_CREATE(msg); + gpr_free(msg); + connection_data_failed(exec_ctx, cd, "HTTP proxy read request", error); + return; + } + // Resolve address. + grpc_resolved_addresses* resolved_addresses = NULL; + error = grpc_blocking_resolve_address(cd->http_request.path, "80", + &resolved_addresses); + if (error != GRPC_ERROR_NONE) { + connection_data_failed(exec_ctx, cd, "HTTP proxy DNS lookup", error); + return; + } + GPR_ASSERT(resolved_addresses->naddrs >= 1); + // Connect to requested address. + const gpr_timespec deadline = gpr_time_add( + gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(10, GPR_TIMESPAN)); + grpc_tcp_client_connect(exec_ctx, &cd->on_server_connect_done, + &cd->server_endpoint, cd->pollset_set, + (struct sockaddr*)&resolved_addresses->addrs[0].addr, + resolved_addresses->addrs[0].len, deadline); + grpc_resolved_addresses_destroy(resolved_addresses); +} + +static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, + grpc_endpoint* ep, grpc_pollset* accepting_pollset, + grpc_tcp_server_acceptor* acceptor) { +// FIXME: remove +gpr_log(GPR_ERROR, "==> on_accept()"); + // Instantiate connection_data. + connection_data* cd = gpr_malloc(sizeof(*cd)); + memset(cd, 0, sizeof(*cd)); + cd->client_endpoint = ep; + cd->pollset_set = grpc_pollset_set_create(); + grpc_closure_init(&cd->on_read_request_done, on_read_request_done, cd); + grpc_closure_init(&cd->on_server_connect_done, on_server_connect_done, cd); + grpc_closure_init(&cd->on_write_response_done, on_write_response_done, cd); + grpc_closure_init(&cd->on_client_read_done, on_client_read_done, cd); + grpc_closure_init(&cd->on_client_write_done, on_client_write_done, cd); + grpc_closure_init(&cd->on_server_read_done, on_server_read_done, cd); + grpc_closure_init(&cd->on_server_write_done, on_server_write_done, cd); + gpr_slice_buffer_init(&cd->client_read_buffer); + gpr_slice_buffer_init(&cd->client_write_buffer); + gpr_slice_buffer_init(&cd->server_read_buffer); + gpr_slice_buffer_init(&cd->server_write_buffer); + grpc_http_parser_init(&cd->http_parser, GRPC_HTTP_REQUEST, + &cd->http_request); + grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, + &cd->on_read_request_done); +} + +// +// Proxy class +// + +struct grpc_end2end_http_proxy { + char* proxy_name; +// gpr_thd_id thd; + grpc_tcp_server* server; + grpc_channel_args* channel_args; + gpr_mu* mu; + grpc_pollset* pollset; +}; + +#if 0 +static void thread_main(void *arg) { + //grpc_end2end_http_proxy *proxy = arg; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + while (true) { + grpc_exec_ctx_flush(&exec_ctx); + } +} +#endif + +grpc_end2end_http_proxy* grpc_end2end_http_proxy_create() { + grpc_end2end_http_proxy* proxy = gpr_malloc(sizeof(*proxy)); + memset(proxy, 0, sizeof(*proxy)); + // Construct proxy address. + const int proxy_port = grpc_pick_unused_port_or_die(); + gpr_join_host_port(&proxy->proxy_name, "localhost", proxy_port); + gpr_log(GPR_INFO, "Proxy address: %s", proxy->proxy_name); +// FIXME: remove +gpr_log(GPR_ERROR, "Proxy address: %s", proxy->proxy_name); + // Create TCP server. + proxy->channel_args = grpc_channel_args_copy(NULL); + grpc_error* error = grpc_tcp_server_create( + NULL, proxy->channel_args, &proxy->server); + GPR_ASSERT(error == GRPC_ERROR_NONE); + // Bind to port. + struct sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + grpc_sockaddr_set_port((struct sockaddr*)&addr, proxy_port); + int port; + error = grpc_tcp_server_add_port( + proxy->server, (struct sockaddr*)&addr, sizeof(addr), &port); + GPR_ASSERT(error == GRPC_ERROR_NONE); + GPR_ASSERT(port == proxy_port); + // Start server. + proxy->pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(proxy->pollset, &proxy->mu); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_tcp_server_start(&exec_ctx, proxy->server, &proxy->pollset, 1, + on_accept, NULL); + grpc_exec_ctx_finish(&exec_ctx); +#if 0 + // Start proxy thread. + gpr_thd_options opt = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&opt); + GPR_ASSERT(gpr_thd_new(&proxy->thd, thread_main, proxy, &opt)); +#endif + return proxy; +} + +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, + grpc_error *error) { + grpc_pollset_destroy(p); +} + +void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_tcp_server_shutdown_listeners(&exec_ctx, proxy->server); + grpc_tcp_server_unref(&exec_ctx, proxy->server); +// gpr_thd_join(proxy->thd); + gpr_free(proxy->proxy_name); + grpc_channel_args_destroy(proxy->channel_args); + grpc_closure destroyed; + grpc_closure_init(&destroyed, destroy_pollset, &proxy->pollset); + grpc_pollset_shutdown(&exec_ctx, proxy->pollset, &destroyed); + gpr_free(proxy); + grpc_exec_ctx_finish(&exec_ctx); +} + +const char *grpc_end2end_http_proxy_get_proxy_name( + grpc_end2end_http_proxy *proxy) { + return proxy->proxy_name; +} diff --git a/test/core/end2end/fixtures/http_proxy.h b/test/core/end2end/fixtures/http_proxy.h new file mode 100644 index 0000000000..7af2ea92d0 --- /dev/null +++ b/test/core/end2end/fixtures/http_proxy.h @@ -0,0 +1,41 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +typedef struct grpc_end2end_http_proxy grpc_end2end_http_proxy; + +grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(); + +void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy); + +const char *grpc_end2end_http_proxy_get_proxy_name( + grpc_end2end_http_proxy *proxy); -- cgit v1.2.3