From 3cfc5a7b1d8c428d50c9be537b47ad543bff3a40 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 27 Jul 2016 07:48:39 -0700 Subject: Made significant progress on the test, but not working yet. --- test/core/end2end/fixtures/http_proxy.c | 351 ++++++++++++++++++++++++++++++++ 1 file changed, 351 insertions(+) create mode 100644 test/core/end2end/fixtures/http_proxy.c (limited to 'test/core/end2end/fixtures/http_proxy.c') diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c new file mode 100644 index 0000000000..a8d68f0249 --- /dev/null +++ b/test/core/end2end/fixtures/http_proxy.c @@ -0,0 +1,351 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/core/end2end/fixtures/http_proxy.h" + +#include + +#include +#include +#include +#include +#include +#include +//#include +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/pollset.h" +#include "src/core/lib/iomgr/pollset_set.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/iomgr/tcp_server.h" +#include "test/core/util/port.h" + +// +// Connection handling +// + +typedef struct connection_data { + grpc_endpoint* client_endpoint; + grpc_endpoint* server_endpoint; + + grpc_pollset_set* pollset_set; + + grpc_closure on_read_request_done; + grpc_closure on_server_connect_done; + grpc_closure on_write_response_done; + grpc_closure on_client_read_done; + grpc_closure on_client_write_done; + grpc_closure on_server_read_done; + grpc_closure on_server_write_done; + + gpr_slice_buffer client_read_buffer; + gpr_slice_buffer client_write_buffer; + gpr_slice_buffer server_read_buffer; + gpr_slice_buffer server_write_buffer; + + grpc_http_parser http_parser; + grpc_http_request http_request; +} connection_data; + +static void connection_data_destroy(grpc_exec_ctx* exec_ctx, + connection_data* cd) { + grpc_endpoint_destroy(exec_ctx, cd->client_endpoint); + if (cd->server_endpoint != NULL) + grpc_endpoint_destroy(exec_ctx, cd->server_endpoint); + grpc_pollset_set_destroy(cd->pollset_set); + gpr_slice_buffer_destroy(&cd->client_read_buffer); + gpr_slice_buffer_destroy(&cd->client_write_buffer); + gpr_slice_buffer_destroy(&cd->server_read_buffer); + gpr_slice_buffer_destroy(&cd->server_write_buffer); + grpc_http_parser_destroy(&cd->http_parser); + grpc_http_request_destroy(&cd->http_request); + gpr_free(cd); +} + +static void connection_data_failed(grpc_exec_ctx* exec_ctx, + connection_data* cd, const char* prefix, + grpc_error* error) { + const char* msg = grpc_error_string(error); + gpr_log(GPR_ERROR, "%s: %s", prefix, msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(error); + grpc_endpoint_shutdown(exec_ctx, cd->client_endpoint); + if (cd->server_endpoint != NULL) + grpc_endpoint_shutdown(exec_ctx, cd->server_endpoint); + connection_data_destroy(exec_ctx, cd); +} + +static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + connection_data* cd = arg; + if (error != GRPC_ERROR_NONE) + connection_data_failed(exec_ctx, cd, "HTTP proxy client write", error); +} + +static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + connection_data* cd = arg; + if (error != GRPC_ERROR_NONE) + connection_data_failed(exec_ctx, cd, "HTTP proxy server write", error); +} + +static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + connection_data* cd = arg; + if (error != GRPC_ERROR_NONE) { + connection_data_failed(exec_ctx, cd, "HTTP proxy client read", error); + return; + } + gpr_slice_buffer_move_into(&cd->client_read_buffer, &cd->server_write_buffer); + grpc_endpoint_write(exec_ctx, cd->server_endpoint, &cd->server_write_buffer, + &cd->on_server_write_done); +} + +static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + connection_data* cd = arg; + if (error != GRPC_ERROR_NONE) { + connection_data_failed(exec_ctx, cd, "HTTP proxy server read", error); + return; + } + gpr_slice_buffer_move_into(&cd->server_read_buffer, &cd->client_write_buffer); + grpc_endpoint_write(exec_ctx, cd->client_endpoint, &cd->client_write_buffer, + &cd->on_client_write_done); +} + +static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + connection_data* cd = arg; + if (error != GRPC_ERROR_NONE) { + connection_data_failed(exec_ctx, cd, "HTTP proxy write response", error); + return; + } + // Set up proxying. + grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, + &cd->on_client_read_done); + grpc_endpoint_read(exec_ctx, cd->server_endpoint, &cd->server_read_buffer, + &cd->on_server_read_done); +} + +static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + connection_data* cd = arg; + if (error != GRPC_ERROR_NONE) { + connection_data_failed(exec_ctx, cd, "HTTP proxy server connect", error); + return; + } + // We've established a connection, so send back a 200 response code to + // the client. + gpr_slice slice = gpr_slice_from_copied_string("200 connected\r\n"); + gpr_slice_buffer_reset_and_unref(&cd->client_write_buffer); + gpr_slice_buffer_add(&cd->client_write_buffer, slice); + grpc_endpoint_write(exec_ctx, cd->client_endpoint, &cd->client_write_buffer, + &cd->on_write_response_done); +} + +static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + connection_data* cd = arg; + if (error != GRPC_ERROR_NONE) { + connection_data_failed(exec_ctx, cd, "HTTP proxy read request", error); + return; + } + // Read request and feed it to the parser. + for (size_t i = 0; i < cd->client_read_buffer.count; ++i) { + if (GPR_SLICE_LENGTH(cd->client_read_buffer.slices[i]) > 0) { + error = grpc_http_parser_parse( + &cd->http_parser, cd->client_read_buffer.slices[i]); + if (error != GRPC_ERROR_NONE) { + connection_data_failed(exec_ctx, cd, "HTTP proxy request parse", + error); + return; + } + } + } + gpr_slice_buffer_reset_and_unref(&cd->client_read_buffer); + // If we're not done reading the request, read more data. + if (cd->http_parser.state != GRPC_HTTP_BODY) { + grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, + &cd->on_read_request_done); + return; + } + // Make sure we got a CONNECT request. + if (strcmp(cd->http_request.method, "CONNECT") != 0) { + char* msg; + gpr_asprintf(&msg, "HTTP proxy got request method %s", + cd->http_request.method); + error = GRPC_ERROR_CREATE(msg); + gpr_free(msg); + connection_data_failed(exec_ctx, cd, "HTTP proxy read request", error); + return; + } + // Resolve address. + grpc_resolved_addresses* resolved_addresses = NULL; + error = grpc_blocking_resolve_address(cd->http_request.path, "80", + &resolved_addresses); + if (error != GRPC_ERROR_NONE) { + connection_data_failed(exec_ctx, cd, "HTTP proxy DNS lookup", error); + return; + } + GPR_ASSERT(resolved_addresses->naddrs >= 1); + // Connect to requested address. + const gpr_timespec deadline = gpr_time_add( + gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(10, GPR_TIMESPAN)); + grpc_tcp_client_connect(exec_ctx, &cd->on_server_connect_done, + &cd->server_endpoint, cd->pollset_set, + (struct sockaddr*)&resolved_addresses->addrs[0].addr, + resolved_addresses->addrs[0].len, deadline); + grpc_resolved_addresses_destroy(resolved_addresses); +} + +static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, + grpc_endpoint* ep, grpc_pollset* accepting_pollset, + grpc_tcp_server_acceptor* acceptor) { +// FIXME: remove +gpr_log(GPR_ERROR, "==> on_accept()"); + // Instantiate connection_data. + connection_data* cd = gpr_malloc(sizeof(*cd)); + memset(cd, 0, sizeof(*cd)); + cd->client_endpoint = ep; + cd->pollset_set = grpc_pollset_set_create(); + grpc_closure_init(&cd->on_read_request_done, on_read_request_done, cd); + grpc_closure_init(&cd->on_server_connect_done, on_server_connect_done, cd); + grpc_closure_init(&cd->on_write_response_done, on_write_response_done, cd); + grpc_closure_init(&cd->on_client_read_done, on_client_read_done, cd); + grpc_closure_init(&cd->on_client_write_done, on_client_write_done, cd); + grpc_closure_init(&cd->on_server_read_done, on_server_read_done, cd); + grpc_closure_init(&cd->on_server_write_done, on_server_write_done, cd); + gpr_slice_buffer_init(&cd->client_read_buffer); + gpr_slice_buffer_init(&cd->client_write_buffer); + gpr_slice_buffer_init(&cd->server_read_buffer); + gpr_slice_buffer_init(&cd->server_write_buffer); + grpc_http_parser_init(&cd->http_parser, GRPC_HTTP_REQUEST, + &cd->http_request); + grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, + &cd->on_read_request_done); +} + +// +// Proxy class +// + +struct grpc_end2end_http_proxy { + char* proxy_name; +// gpr_thd_id thd; + grpc_tcp_server* server; + grpc_channel_args* channel_args; + gpr_mu* mu; + grpc_pollset* pollset; +}; + +#if 0 +static void thread_main(void *arg) { + //grpc_end2end_http_proxy *proxy = arg; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + while (true) { + grpc_exec_ctx_flush(&exec_ctx); + } +} +#endif + +grpc_end2end_http_proxy* grpc_end2end_http_proxy_create() { + grpc_end2end_http_proxy* proxy = gpr_malloc(sizeof(*proxy)); + memset(proxy, 0, sizeof(*proxy)); + // Construct proxy address. + const int proxy_port = grpc_pick_unused_port_or_die(); + gpr_join_host_port(&proxy->proxy_name, "localhost", proxy_port); + gpr_log(GPR_INFO, "Proxy address: %s", proxy->proxy_name); +// FIXME: remove +gpr_log(GPR_ERROR, "Proxy address: %s", proxy->proxy_name); + // Create TCP server. + proxy->channel_args = grpc_channel_args_copy(NULL); + grpc_error* error = grpc_tcp_server_create( + NULL, proxy->channel_args, &proxy->server); + GPR_ASSERT(error == GRPC_ERROR_NONE); + // Bind to port. + struct sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + grpc_sockaddr_set_port((struct sockaddr*)&addr, proxy_port); + int port; + error = grpc_tcp_server_add_port( + proxy->server, (struct sockaddr*)&addr, sizeof(addr), &port); + GPR_ASSERT(error == GRPC_ERROR_NONE); + GPR_ASSERT(port == proxy_port); + // Start server. + proxy->pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(proxy->pollset, &proxy->mu); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_tcp_server_start(&exec_ctx, proxy->server, &proxy->pollset, 1, + on_accept, NULL); + grpc_exec_ctx_finish(&exec_ctx); +#if 0 + // Start proxy thread. + gpr_thd_options opt = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&opt); + GPR_ASSERT(gpr_thd_new(&proxy->thd, thread_main, proxy, &opt)); +#endif + return proxy; +} + +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, + grpc_error *error) { + grpc_pollset_destroy(p); +} + +void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_tcp_server_shutdown_listeners(&exec_ctx, proxy->server); + grpc_tcp_server_unref(&exec_ctx, proxy->server); +// gpr_thd_join(proxy->thd); + gpr_free(proxy->proxy_name); + grpc_channel_args_destroy(proxy->channel_args); + grpc_closure destroyed; + grpc_closure_init(&destroyed, destroy_pollset, &proxy->pollset); + grpc_pollset_shutdown(&exec_ctx, proxy->pollset, &destroyed); + gpr_free(proxy); + grpc_exec_ctx_finish(&exec_ctx); +} + +const char *grpc_end2end_http_proxy_get_proxy_name( + grpc_end2end_http_proxy *proxy) { + return proxy->proxy_name; +} -- cgit v1.2.3 From 9f709a4a7e60e0fdc0e4cfdaaa70ee2d226a3100 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 29 Jul 2016 07:59:25 -0700 Subject: Work on debugging the HTTP proxy implementation. --- test/core/end2end/fixtures/h2_http_proxy.c | 10 ++- test/core/end2end/fixtures/http_proxy.c | 138 ++++++++++++++++++++++------- test/core/end2end/fixtures/http_proxy.h | 2 + 3 files changed, 119 insertions(+), 31 deletions(-) (limited to 'test/core/end2end/fixtures/http_proxy.c') diff --git a/test/core/end2end/fixtures/h2_http_proxy.c b/test/core/end2end/fixtures/h2_http_proxy.c index d84f0b8cb9..002e2cb16b 100644 --- a/test/core/end2end/fixtures/h2_http_proxy.c +++ b/test/core/end2end/fixtures/h2_http_proxy.c @@ -52,6 +52,7 @@ #include "test/core/util/port.h" #include "test/core/util/test_config.h" +#if 0 typedef struct fullstack_fixture_data { char *server_addr; grpc_end2end_http_proxy *proxy; @@ -110,17 +111,24 @@ static grpc_end2end_test_config configs[] = { chttp2_create_fixture_fullstack, chttp2_init_client_fullstack, chttp2_init_server_fullstack, chttp2_tear_down_fullstack}, }; +#endif int main(int argc, char **argv) { - size_t i; +// size_t i; grpc_test_init(argc, argv); grpc_end2end_tests_pre_init(); grpc_init(); + grpc_end2end_http_proxy* proxy = grpc_end2end_http_proxy_create(); + grpc_end2end_http_proxy_start_thread(proxy); + grpc_end2end_http_proxy_destroy(proxy); + +#if 0 for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) { grpc_end2end_tests(argc, argv, configs[i]); } +#endif grpc_shutdown(); diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c index a8d68f0249..975deaeb30 100644 --- a/test/core/end2end/fixtures/http_proxy.c +++ b/test/core/end2end/fixtures/http_proxy.c @@ -41,7 +41,7 @@ #include #include #include -//#include +#include #include #include "src/core/lib/channel/channel_args.h" @@ -58,6 +58,16 @@ #include "src/core/lib/iomgr/tcp_server.h" #include "test/core/util/port.h" +struct grpc_end2end_http_proxy { + char* proxy_name; + gpr_thd_id thd; + grpc_tcp_server* server; + grpc_channel_args* channel_args; + gpr_mu* mu; + grpc_pollset* pollset; + bool shutdown; +}; + // // Connection handling // @@ -83,10 +93,16 @@ typedef struct connection_data { grpc_http_parser http_parser; grpc_http_request http_request; + + grpc_end2end_http_proxy* proxy; + + gpr_refcount refcount; } connection_data; static void connection_data_destroy(grpc_exec_ctx* exec_ctx, connection_data* cd) { +gpr_log(GPR_ERROR, "==> %s()", __func__); + cd->proxy->shutdown = true; grpc_endpoint_destroy(exec_ctx, cd->client_endpoint); if (cd->server_endpoint != NULL) grpc_endpoint_destroy(exec_ctx, cd->server_endpoint); @@ -103,62 +119,96 @@ static void connection_data_destroy(grpc_exec_ctx* exec_ctx, static void connection_data_failed(grpc_exec_ctx* exec_ctx, connection_data* cd, const char* prefix, grpc_error* error) { +gpr_log(GPR_ERROR, "==> %s()", __func__); const char* msg = grpc_error_string(error); gpr_log(GPR_ERROR, "%s: %s", prefix, msg); grpc_error_free_string(msg); GRPC_ERROR_UNREF(error); +gpr_log(GPR_ERROR, "HERE 0"); grpc_endpoint_shutdown(exec_ctx, cd->client_endpoint); +gpr_log(GPR_ERROR, "HERE 1"); if (cd->server_endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, cd->server_endpoint); - connection_data_destroy(exec_ctx, cd); +gpr_log(GPR_ERROR, "HERE 2"); + if (gpr_unref(&cd->refcount)) { +gpr_log(GPR_ERROR, "HERE 2.5"); + connection_data_destroy(exec_ctx, cd); + } +gpr_log(GPR_ERROR, "HERE 3"); } static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; - if (error != GRPC_ERROR_NONE) + if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy client write", error); + return; + } + // Clear write buffer. + gpr_slice_buffer_reset_and_unref(&cd->client_write_buffer); } static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; - if (error != GRPC_ERROR_NONE) + if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy server write", error); + return; + } + // Clear write buffer. + gpr_slice_buffer_reset_and_unref(&cd->server_write_buffer); } static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy client read", error); return; } + // Move read data into write buffer and write it. gpr_slice_buffer_move_into(&cd->client_read_buffer, &cd->server_write_buffer); grpc_endpoint_write(exec_ctx, cd->server_endpoint, &cd->server_write_buffer, &cd->on_server_write_done); + // Read more data. + grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, + &cd->on_client_read_done); } static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy server read", error); return; } + // Move read data into write buffer and write it. gpr_slice_buffer_move_into(&cd->server_read_buffer, &cd->client_write_buffer); grpc_endpoint_write(exec_ctx, cd->client_endpoint, &cd->client_write_buffer, &cd->on_client_write_done); + // Read more data. + grpc_endpoint_read(exec_ctx, cd->server_endpoint, &cd->server_read_buffer, + &cd->on_server_read_done); } static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy write response", error); return; } - // Set up proxying. + // Clear write buffer. + gpr_slice_buffer_reset_and_unref(&cd->client_write_buffer); + // Start reading from both client and server. + // We increase the refcount by one, since we already held one reference + // for ourselves, and there will now be two pending callbacks. + gpr_ref(&cd->refcount); grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, &cd->on_client_read_done); grpc_endpoint_read(exec_ctx, cd->server_endpoint, &cd->server_read_buffer, @@ -167,6 +217,7 @@ static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy server connect", error); @@ -174,8 +225,7 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, } // We've established a connection, so send back a 200 response code to // the client. - gpr_slice slice = gpr_slice_from_copied_string("200 connected\r\n"); - gpr_slice_buffer_reset_and_unref(&cd->client_write_buffer); + gpr_slice slice = gpr_slice_from_copied_string("200 connected\r\n\r\n"); gpr_slice_buffer_add(&cd->client_write_buffer, slice); grpc_endpoint_write(exec_ctx, cd->client_endpoint, &cd->client_write_buffer, &cd->on_write_response_done); @@ -183,6 +233,7 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy read request", error); @@ -240,12 +291,14 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* ep, grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor) { // FIXME: remove -gpr_log(GPR_ERROR, "==> on_accept()"); +gpr_log(GPR_ERROR, "==> %s()", __func__); + grpc_end2end_http_proxy* proxy = arg; // Instantiate connection_data. connection_data* cd = gpr_malloc(sizeof(*cd)); memset(cd, 0, sizeof(*cd)); cd->client_endpoint = ep; cd->pollset_set = grpc_pollset_set_create(); + grpc_pollset_set_add_pollset(exec_ctx, cd->pollset_set, proxy->pollset); grpc_closure_init(&cd->on_read_request_done, on_read_request_done, cd); grpc_closure_init(&cd->on_server_connect_done, on_server_connect_done, cd); grpc_closure_init(&cd->on_write_response_done, on_write_response_done, cd); @@ -259,6 +312,8 @@ gpr_log(GPR_ERROR, "==> on_accept()"); gpr_slice_buffer_init(&cd->server_write_buffer); grpc_http_parser_init(&cd->http_parser, GRPC_HTTP_REQUEST, &cd->http_request); + cd->proxy = proxy; + gpr_ref_init(&cd->refcount, 1); grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, &cd->on_read_request_done); } @@ -267,25 +322,6 @@ gpr_log(GPR_ERROR, "==> on_accept()"); // Proxy class // -struct grpc_end2end_http_proxy { - char* proxy_name; -// gpr_thd_id thd; - grpc_tcp_server* server; - grpc_channel_args* channel_args; - gpr_mu* mu; - grpc_pollset* pollset; -}; - -#if 0 -static void thread_main(void *arg) { - //grpc_end2end_http_proxy *proxy = arg; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - while (true) { - grpc_exec_ctx_flush(&exec_ctx); - } -} -#endif - grpc_end2end_http_proxy* grpc_end2end_http_proxy_create() { grpc_end2end_http_proxy* proxy = gpr_malloc(sizeof(*proxy)); memset(proxy, 0, sizeof(*proxy)); @@ -315,7 +351,7 @@ gpr_log(GPR_ERROR, "Proxy address: %s", proxy->proxy_name); grpc_pollset_init(proxy->pollset, &proxy->mu); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp_server_start(&exec_ctx, proxy->server, &proxy->pollset, 1, - on_accept, NULL); + on_accept, proxy); grpc_exec_ctx_finish(&exec_ctx); #if 0 // Start proxy thread. @@ -331,15 +367,21 @@ static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, grpc_pollset_destroy(p); } +// FIXME: remove (including all references below) +//#define USE_THREAD 1 + void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { +gpr_log(GPR_ERROR, "==> %s()", __func__); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp_server_shutdown_listeners(&exec_ctx, proxy->server); grpc_tcp_server_unref(&exec_ctx, proxy->server); -// gpr_thd_join(proxy->thd); +#ifdef USE_THREAD + gpr_thd_join(proxy->thd); +#endif gpr_free(proxy->proxy_name); grpc_channel_args_destroy(proxy->channel_args); grpc_closure destroyed; - grpc_closure_init(&destroyed, destroy_pollset, &proxy->pollset); + grpc_closure_init(&destroyed, destroy_pollset, proxy->pollset); grpc_pollset_shutdown(&exec_ctx, proxy->pollset, &destroyed); gpr_free(proxy); grpc_exec_ctx_finish(&exec_ctx); @@ -349,3 +391,39 @@ const char *grpc_end2end_http_proxy_get_proxy_name( grpc_end2end_http_proxy *proxy) { return proxy->proxy_name; } + +static void thread_main(void* arg) { +gpr_log(GPR_ERROR, "==> %s()", __func__); + grpc_end2end_http_proxy *proxy = arg; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + do { +gpr_log(GPR_ERROR, "HERE a"); + const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + const gpr_timespec deadline = + gpr_time_add(now, gpr_time_from_seconds(5, GPR_TIMESPAN)); + grpc_pollset_worker *worker = NULL; +gpr_log(GPR_ERROR, "HERE b"); + gpr_mu_lock(proxy->mu); +gpr_log(GPR_ERROR, "HERE c"); + GRPC_LOG_IF_ERROR("grpc_pollset_work", + grpc_pollset_work(&exec_ctx, proxy->pollset, &worker, + now, deadline)); +gpr_log(GPR_ERROR, "HERE d"); + gpr_mu_unlock(proxy->mu); +gpr_log(GPR_ERROR, "HERE e"); + grpc_exec_ctx_flush(&exec_ctx); +gpr_log(GPR_ERROR, "HERE f"); + } while (!proxy->shutdown); +gpr_log(GPR_ERROR, "HERE g"); + grpc_exec_ctx_finish(&exec_ctx); +} + +void grpc_end2end_http_proxy_start_thread(grpc_end2end_http_proxy *proxy) { +#ifdef USE_THREAD + gpr_thd_options opt = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&opt); + GPR_ASSERT(gpr_thd_new(&proxy->thd, thread_main, proxy, &opt)); +#else + thread_main(proxy); +#endif +} diff --git a/test/core/end2end/fixtures/http_proxy.h b/test/core/end2end/fixtures/http_proxy.h index 7af2ea92d0..444324e7e0 100644 --- a/test/core/end2end/fixtures/http_proxy.h +++ b/test/core/end2end/fixtures/http_proxy.h @@ -39,3 +39,5 @@ void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy); const char *grpc_end2end_http_proxy_get_proxy_name( grpc_end2end_http_proxy *proxy); + +void grpc_end2end_http_proxy_start_thread(grpc_end2end_http_proxy *proxy); -- cgit v1.2.3 From 8d7dc2772d7086d9ec6f71fb67f30e3b0c038eb3 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 29 Jul 2016 08:43:09 -0700 Subject: Fix use-after-free and memory leak problems. --- test/core/end2end/fixtures/http_proxy.c | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) (limited to 'test/core/end2end/fixtures/http_proxy.c') diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c index 975deaeb30..97aced3d1e 100644 --- a/test/core/end2end/fixtures/http_proxy.c +++ b/test/core/end2end/fixtures/http_proxy.c @@ -123,18 +123,12 @@ gpr_log(GPR_ERROR, "==> %s()", __func__); const char* msg = grpc_error_string(error); gpr_log(GPR_ERROR, "%s: %s", prefix, msg); grpc_error_free_string(msg); - GRPC_ERROR_UNREF(error); -gpr_log(GPR_ERROR, "HERE 0"); grpc_endpoint_shutdown(exec_ctx, cd->client_endpoint); -gpr_log(GPR_ERROR, "HERE 1"); if (cd->server_endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, cd->server_endpoint); -gpr_log(GPR_ERROR, "HERE 2"); if (gpr_unref(&cd->refcount)) { -gpr_log(GPR_ERROR, "HERE 2.5"); connection_data_destroy(exec_ctx, cd); } -gpr_log(GPR_ERROR, "HERE 3"); } static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, @@ -365,6 +359,7 @@ gpr_log(GPR_ERROR, "Proxy address: %s", proxy->proxy_name); static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { grpc_pollset_destroy(p); + gpr_free(p); } // FIXME: remove (including all references below) @@ -397,24 +392,17 @@ gpr_log(GPR_ERROR, "==> %s()", __func__); grpc_end2end_http_proxy *proxy = arg; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; do { -gpr_log(GPR_ERROR, "HERE a"); const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); const gpr_timespec deadline = gpr_time_add(now, gpr_time_from_seconds(5, GPR_TIMESPAN)); grpc_pollset_worker *worker = NULL; -gpr_log(GPR_ERROR, "HERE b"); gpr_mu_lock(proxy->mu); -gpr_log(GPR_ERROR, "HERE c"); GRPC_LOG_IF_ERROR("grpc_pollset_work", grpc_pollset_work(&exec_ctx, proxy->pollset, &worker, now, deadline)); -gpr_log(GPR_ERROR, "HERE d"); gpr_mu_unlock(proxy->mu); -gpr_log(GPR_ERROR, "HERE e"); grpc_exec_ctx_flush(&exec_ctx); -gpr_log(GPR_ERROR, "HERE f"); } while (!proxy->shutdown); -gpr_log(GPR_ERROR, "HERE g"); grpc_exec_ctx_finish(&exec_ctx); } -- cgit v1.2.3 From 477d061238798ffccc7842cbc312ef80a52663d3 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 29 Jul 2016 13:03:26 -0700 Subject: Clean up test. Still debugging things. --- test/core/end2end/fixtures/h2_http_proxy.c | 10 +-- test/core/end2end/fixtures/http_proxy.c | 103 +++++++++++++---------------- test/core/end2end/fixtures/http_proxy.h | 2 - 3 files changed, 46 insertions(+), 69 deletions(-) (limited to 'test/core/end2end/fixtures/http_proxy.c') diff --git a/test/core/end2end/fixtures/h2_http_proxy.c b/test/core/end2end/fixtures/h2_http_proxy.c index 002e2cb16b..d84f0b8cb9 100644 --- a/test/core/end2end/fixtures/h2_http_proxy.c +++ b/test/core/end2end/fixtures/h2_http_proxy.c @@ -52,7 +52,6 @@ #include "test/core/util/port.h" #include "test/core/util/test_config.h" -#if 0 typedef struct fullstack_fixture_data { char *server_addr; grpc_end2end_http_proxy *proxy; @@ -111,24 +110,17 @@ static grpc_end2end_test_config configs[] = { chttp2_create_fixture_fullstack, chttp2_init_client_fullstack, chttp2_init_server_fullstack, chttp2_tear_down_fullstack}, }; -#endif int main(int argc, char **argv) { -// size_t i; + size_t i; grpc_test_init(argc, argv); grpc_end2end_tests_pre_init(); grpc_init(); - grpc_end2end_http_proxy* proxy = grpc_end2end_http_proxy_create(); - grpc_end2end_http_proxy_start_thread(proxy); - grpc_end2end_http_proxy_destroy(proxy); - -#if 0 for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) { grpc_end2end_tests(argc, argv, configs[i]); } -#endif grpc_shutdown(); diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c index 97aced3d1e..daf023958c 100644 --- a/test/core/end2end/fixtures/http_proxy.c +++ b/test/core/end2end/fixtures/http_proxy.c @@ -99,9 +99,9 @@ typedef struct connection_data { gpr_refcount refcount; } connection_data; +// Helper function to destroy the proxy connection. static void connection_data_destroy(grpc_exec_ctx* exec_ctx, connection_data* cd) { -gpr_log(GPR_ERROR, "==> %s()", __func__); cd->proxy->shutdown = true; grpc_endpoint_destroy(exec_ctx, cd->client_endpoint); if (cd->server_endpoint != NULL) @@ -116,54 +116,59 @@ gpr_log(GPR_ERROR, "==> %s()", __func__); gpr_free(cd); } +// Helper function to shut down the proxy connection. +// Does NOT take ownership of a reference to error. static void connection_data_failed(grpc_exec_ctx* exec_ctx, connection_data* cd, const char* prefix, grpc_error* error) { -gpr_log(GPR_ERROR, "==> %s()", __func__); const char* msg = grpc_error_string(error); gpr_log(GPR_ERROR, "%s: %s", prefix, msg); grpc_error_free_string(msg); grpc_endpoint_shutdown(exec_ctx, cd->client_endpoint); if (cd->server_endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, cd->server_endpoint); - if (gpr_unref(&cd->refcount)) { + if (gpr_unref(&cd->refcount)) connection_data_destroy(exec_ctx, cd); - } } +// Callback for writing proxy data to the client. static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy client write", error); return; } - // Clear write buffer. + // Clear write buffer and release our reference. gpr_slice_buffer_reset_and_unref(&cd->client_write_buffer); + gpr_unref(&cd->refcount); } +// Callback for writing proxy data to the backend server. static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy server write", error); return; } - // Clear write buffer. + // Clear write buffer and release our reference. gpr_slice_buffer_reset_and_unref(&cd->server_write_buffer); + gpr_unref(&cd->refcount); } +// Callback for reading data from the client, which will be proxied to +// the backend server. static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy client read", error); return; } // Move read data into write buffer and write it. + // Take a new ref for the write callback. + gpr_ref(&cd->refcount); gpr_slice_buffer_move_into(&cd->client_read_buffer, &cd->server_write_buffer); grpc_endpoint_write(exec_ctx, cd->server_endpoint, &cd->server_write_buffer, &cd->on_server_write_done); @@ -172,15 +177,18 @@ gpr_log(GPR_ERROR, "==> %s()", __func__); &cd->on_client_read_done); } +// Callback for reading data from the backend server, which will be +// proxied to the client. static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy server read", error); return; } // Move read data into write buffer and write it. + // Take a new ref for the write callback. + gpr_ref(&cd->refcount); gpr_slice_buffer_move_into(&cd->server_read_buffer, &cd->client_write_buffer); grpc_endpoint_write(exec_ctx, cd->client_endpoint, &cd->client_write_buffer, &cd->on_client_write_done); @@ -189,9 +197,9 @@ gpr_log(GPR_ERROR, "==> %s()", __func__); &cd->on_server_read_done); } +// Callback to write the HTTP response for the CONNECT request. static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy write response", error); @@ -209,9 +217,10 @@ gpr_log(GPR_ERROR, "==> %s()", __func__); &cd->on_server_read_done); } +// Callback to connect to the backend server specified by the HTTP +// CONNECT request. static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy server connect", error); @@ -225,9 +234,9 @@ gpr_log(GPR_ERROR, "==> %s()", __func__); &cd->on_write_response_done); } +// Callback to read the HTTP CONNECT request. static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy read request", error); @@ -284,8 +293,6 @@ gpr_log(GPR_ERROR, "==> %s()", __func__); static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* ep, grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor) { -// FIXME: remove -gpr_log(GPR_ERROR, "==> %s()", __func__); grpc_end2end_http_proxy* proxy = arg; // Instantiate connection_data. connection_data* cd = gpr_malloc(sizeof(*cd)); @@ -316,6 +323,24 @@ gpr_log(GPR_ERROR, "==> %s()", __func__); // Proxy class // +static void thread_main(void* arg) { + grpc_end2end_http_proxy *proxy = arg; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + do { + const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + const gpr_timespec deadline = + gpr_time_add(now, gpr_time_from_seconds(5, GPR_TIMESPAN)); + grpc_pollset_worker *worker = NULL; + gpr_mu_lock(proxy->mu); + GRPC_LOG_IF_ERROR("grpc_pollset_work", + grpc_pollset_work(&exec_ctx, proxy->pollset, &worker, + now, deadline)); + gpr_mu_unlock(proxy->mu); + grpc_exec_ctx_flush(&exec_ctx); + } while (!proxy->shutdown); + grpc_exec_ctx_finish(&exec_ctx); +} + grpc_end2end_http_proxy* grpc_end2end_http_proxy_create() { grpc_end2end_http_proxy* proxy = gpr_malloc(sizeof(*proxy)); memset(proxy, 0, sizeof(*proxy)); @@ -323,8 +348,6 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create() { const int proxy_port = grpc_pick_unused_port_or_die(); gpr_join_host_port(&proxy->proxy_name, "localhost", proxy_port); gpr_log(GPR_INFO, "Proxy address: %s", proxy->proxy_name); -// FIXME: remove -gpr_log(GPR_ERROR, "Proxy address: %s", proxy->proxy_name); // Create TCP server. proxy->channel_args = grpc_channel_args_copy(NULL); grpc_error* error = grpc_tcp_server_create( @@ -347,32 +370,25 @@ gpr_log(GPR_ERROR, "Proxy address: %s", proxy->proxy_name); grpc_tcp_server_start(&exec_ctx, proxy->server, &proxy->pollset, 1, on_accept, proxy); grpc_exec_ctx_finish(&exec_ctx); -#if 0 // Start proxy thread. gpr_thd_options opt = gpr_thd_options_default(); gpr_thd_options_set_joinable(&opt); GPR_ASSERT(gpr_thd_new(&proxy->thd, thread_main, proxy, &opt)); -#endif return proxy; } -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_pollset_destroy(p); - gpr_free(p); + grpc_pollset* pollset = arg; + grpc_pollset_destroy(pollset); + gpr_free(pollset); } -// FIXME: remove (including all references below) -//#define USE_THREAD 1 - void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { -gpr_log(GPR_ERROR, "==> %s()", __func__); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_thd_join(proxy->thd); grpc_tcp_server_shutdown_listeners(&exec_ctx, proxy->server); grpc_tcp_server_unref(&exec_ctx, proxy->server); -#ifdef USE_THREAD - gpr_thd_join(proxy->thd); -#endif gpr_free(proxy->proxy_name); grpc_channel_args_destroy(proxy->channel_args); grpc_closure destroyed; @@ -386,32 +402,3 @@ const char *grpc_end2end_http_proxy_get_proxy_name( grpc_end2end_http_proxy *proxy) { return proxy->proxy_name; } - -static void thread_main(void* arg) { -gpr_log(GPR_ERROR, "==> %s()", __func__); - grpc_end2end_http_proxy *proxy = arg; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - do { - const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - const gpr_timespec deadline = - gpr_time_add(now, gpr_time_from_seconds(5, GPR_TIMESPAN)); - grpc_pollset_worker *worker = NULL; - gpr_mu_lock(proxy->mu); - GRPC_LOG_IF_ERROR("grpc_pollset_work", - grpc_pollset_work(&exec_ctx, proxy->pollset, &worker, - now, deadline)); - gpr_mu_unlock(proxy->mu); - grpc_exec_ctx_flush(&exec_ctx); - } while (!proxy->shutdown); - grpc_exec_ctx_finish(&exec_ctx); -} - -void grpc_end2end_http_proxy_start_thread(grpc_end2end_http_proxy *proxy) { -#ifdef USE_THREAD - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - GPR_ASSERT(gpr_thd_new(&proxy->thd, thread_main, proxy, &opt)); -#else - thread_main(proxy); -#endif -} diff --git a/test/core/end2end/fixtures/http_proxy.h b/test/core/end2end/fixtures/http_proxy.h index 444324e7e0..7af2ea92d0 100644 --- a/test/core/end2end/fixtures/http_proxy.h +++ b/test/core/end2end/fixtures/http_proxy.h @@ -39,5 +39,3 @@ void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy); const char *grpc_end2end_http_proxy_get_proxy_name( grpc_end2end_http_proxy *proxy); - -void grpc_end2end_http_proxy_start_thread(grpc_end2end_http_proxy *proxy); -- cgit v1.2.3 From 63aa0f745bb032eb743b5a480d009b45ed927221 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 2 Aug 2016 10:02:17 -0700 Subject: Trying to debug proxy tests. Not working yet. --- test/core/end2end/fixtures/http_proxy.c | 312 ++++++++++++++++++++------------ 1 file changed, 199 insertions(+), 113 deletions(-) (limited to 'test/core/end2end/fixtures/http_proxy.c') diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c index daf023958c..3953687c26 100644 --- a/test/core/end2end/fixtures/http_proxy.c +++ b/test/core/end2end/fixtures/http_proxy.c @@ -72,10 +72,21 @@ struct grpc_end2end_http_proxy { // Connection handling // -typedef struct connection_data { +typedef struct proxy_connection { grpc_endpoint* client_endpoint; grpc_endpoint* server_endpoint; + gpr_refcount refcount; + bool client_shutdown; + bool server_shutdown; + bool client_write_pending; + bool server_write_pending; + +size_t client_bytes_read; +size_t client_bytes_written; +size_t server_bytes_read; +size_t server_bytes_written; + grpc_pollset_set* pollset_set; grpc_closure on_read_request_done; @@ -94,229 +105,304 @@ typedef struct connection_data { grpc_http_parser http_parser; grpc_http_request http_request; - grpc_end2end_http_proxy* proxy; - - gpr_refcount refcount; -} connection_data; + grpc_end2end_http_proxy* proxy; // Does not own. +} proxy_connection; // Helper function to destroy the proxy connection. -static void connection_data_destroy(grpc_exec_ctx* exec_ctx, - connection_data* cd) { - cd->proxy->shutdown = true; - grpc_endpoint_destroy(exec_ctx, cd->client_endpoint); - if (cd->server_endpoint != NULL) - grpc_endpoint_destroy(exec_ctx, cd->server_endpoint); - grpc_pollset_set_destroy(cd->pollset_set); - gpr_slice_buffer_destroy(&cd->client_read_buffer); - gpr_slice_buffer_destroy(&cd->client_write_buffer); - gpr_slice_buffer_destroy(&cd->server_read_buffer); - gpr_slice_buffer_destroy(&cd->server_write_buffer); - grpc_http_parser_destroy(&cd->http_parser); - grpc_http_request_destroy(&cd->http_request); - gpr_free(cd); +static void proxy_connection_destroy(grpc_exec_ctx* exec_ctx, + proxy_connection* conn) { +gpr_log(GPR_INFO, "==> %s()", __func__); +gpr_log(GPR_INFO, "client_bytes_read=%lu", conn->client_bytes_read); +gpr_log(GPR_INFO, "server_bytes_written=%lu", conn->server_bytes_written); +gpr_log(GPR_INFO, "server_bytes_read=%lu", conn->server_bytes_read); +gpr_log(GPR_INFO, "client_bytes_written=%lu", conn->client_bytes_written); + // Tell the server to shut down when this connection is closed. + conn->proxy->shutdown = true; + grpc_endpoint_destroy(exec_ctx, conn->client_endpoint); + if (conn->server_endpoint != NULL) + grpc_endpoint_destroy(exec_ctx, conn->server_endpoint); + grpc_pollset_set_destroy(conn->pollset_set); + gpr_slice_buffer_destroy(&conn->client_read_buffer); + gpr_slice_buffer_destroy(&conn->client_write_buffer); + gpr_slice_buffer_destroy(&conn->server_read_buffer); + gpr_slice_buffer_destroy(&conn->server_write_buffer); + grpc_http_parser_destroy(&conn->http_parser); + grpc_http_request_destroy(&conn->http_request); + gpr_free(conn); } // Helper function to shut down the proxy connection. // Does NOT take ownership of a reference to error. -static void connection_data_failed(grpc_exec_ctx* exec_ctx, - connection_data* cd, const char* prefix, - grpc_error* error) { +static void proxy_connection_failed(grpc_exec_ctx* exec_ctx, + proxy_connection* conn, bool is_client, + const char* prefix, grpc_error* error) { +gpr_log(GPR_INFO, "==> %s()", __func__); const char* msg = grpc_error_string(error); gpr_log(GPR_ERROR, "%s: %s", prefix, msg); grpc_error_free_string(msg); - grpc_endpoint_shutdown(exec_ctx, cd->client_endpoint); - if (cd->server_endpoint != NULL) - grpc_endpoint_shutdown(exec_ctx, cd->server_endpoint); - if (gpr_unref(&cd->refcount)) - connection_data_destroy(exec_ctx, cd); + if (is_client || !conn->client_write_pending) { + grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint); + conn->client_shutdown = true; + } + if (!is_client || !conn->server_write_pending) { + grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint); + conn->server_shutdown = true; + } + if (gpr_unref(&conn->refcount)) + proxy_connection_destroy(exec_ctx, conn); } +// Forward declarations. +static void do_client_read(grpc_exec_ctx* exec_ctx, proxy_connection* conn); +static void do_server_read(grpc_exec_ctx* exec_ctx, proxy_connection* conn); + // Callback for writing proxy data to the client. static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - connection_data* cd = arg; +gpr_log(GPR_INFO, "==> %s()", __func__); + proxy_connection* conn = arg; + conn->client_write_pending = false; if (error != GRPC_ERROR_NONE) { - connection_data_failed(exec_ctx, cd, "HTTP proxy client write", error); + proxy_connection_failed(exec_ctx, conn, true /* is_client */, + "HTTP proxy client write", error); return; } - // Clear write buffer and release our reference. - gpr_slice_buffer_reset_and_unref(&cd->client_write_buffer); - gpr_unref(&cd->refcount); + gpr_unref(&conn->refcount); + // Clear write buffer. +gpr_log(GPR_INFO, "wrote %lu bytes to client", conn->client_write_buffer.length); +conn->client_bytes_written += conn->client_write_buffer.length; + gpr_slice_buffer_reset_and_unref(&conn->client_write_buffer); + // If the server has been shut down, shut down the client now. + if (conn->server_shutdown) + grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint); } // Callback for writing proxy data to the backend server. static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - connection_data* cd = arg; +gpr_log(GPR_INFO, "==> %s()", __func__); + proxy_connection* conn = arg; + conn->server_write_pending = false; if (error != GRPC_ERROR_NONE) { - connection_data_failed(exec_ctx, cd, "HTTP proxy server write", error); + proxy_connection_failed(exec_ctx, conn, false /* is_client */, + "HTTP proxy server write", error); return; } - // Clear write buffer and release our reference. - gpr_slice_buffer_reset_and_unref(&cd->server_write_buffer); - gpr_unref(&cd->refcount); + gpr_unref(&conn->refcount); + // Clear write buffer. +gpr_log(GPR_INFO, "wrote %lu bytes to server", conn->server_write_buffer.length); +conn->server_bytes_written += conn->server_write_buffer.length; + gpr_slice_buffer_reset_and_unref(&conn->server_write_buffer); + // If the client has been shut down, shut down the server now. + if (conn->client_shutdown) + grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint); } // Callback for reading data from the client, which will be proxied to // the backend server. static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - connection_data* cd = arg; +gpr_log(GPR_INFO, "==> %s()", __func__); + proxy_connection* conn = arg; if (error != GRPC_ERROR_NONE) { - connection_data_failed(exec_ctx, cd, "HTTP proxy client read", error); + proxy_connection_failed(exec_ctx, conn, true /* is_client */, + "HTTP proxy client read", error); return; } // Move read data into write buffer and write it. - // Take a new ref for the write callback. - gpr_ref(&cd->refcount); - gpr_slice_buffer_move_into(&cd->client_read_buffer, &cd->server_write_buffer); - grpc_endpoint_write(exec_ctx, cd->server_endpoint, &cd->server_write_buffer, - &cd->on_server_write_done); + // The write operation inherits our reference to conn. +gpr_log(GPR_INFO, "read %lu bytes from client", conn->client_read_buffer.length); +conn->client_bytes_read += conn->client_read_buffer.length; + gpr_slice_buffer_move_into(&conn->client_read_buffer, + &conn->server_write_buffer); + conn->server_write_pending = true; + grpc_endpoint_write(exec_ctx, conn->server_endpoint, + &conn->server_write_buffer, &conn->on_server_write_done); // Read more data. - grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, - &cd->on_client_read_done); + do_client_read(exec_ctx, conn); } // Callback for reading data from the backend server, which will be // proxied to the client. static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - connection_data* cd = arg; +gpr_log(GPR_INFO, "==> %s()", __func__); + proxy_connection* conn = arg; if (error != GRPC_ERROR_NONE) { - connection_data_failed(exec_ctx, cd, "HTTP proxy server read", error); + proxy_connection_failed(exec_ctx, conn, false /* is_client */, + "HTTP proxy server read", error); return; } // Move read data into write buffer and write it. - // Take a new ref for the write callback. - gpr_ref(&cd->refcount); - gpr_slice_buffer_move_into(&cd->server_read_buffer, &cd->client_write_buffer); - grpc_endpoint_write(exec_ctx, cd->client_endpoint, &cd->client_write_buffer, - &cd->on_client_write_done); + // The write operation inherits our reference to conn. +gpr_log(GPR_INFO, "read %lu bytes from server", conn->server_read_buffer.length); +conn->server_bytes_read += conn->server_read_buffer.length; + gpr_slice_buffer_move_into(&conn->server_read_buffer, + &conn->client_write_buffer); + conn->client_write_pending = true; + grpc_endpoint_write(exec_ctx, conn->client_endpoint, + &conn->client_write_buffer, &conn->on_client_write_done); // Read more data. - grpc_endpoint_read(exec_ctx, cd->server_endpoint, &cd->server_read_buffer, - &cd->on_server_read_done); + do_server_read(exec_ctx, conn); } // Callback to write the HTTP response for the CONNECT request. static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - connection_data* cd = arg; +gpr_log(GPR_INFO, "==> %s()", __func__); + proxy_connection* conn = arg; if (error != GRPC_ERROR_NONE) { - connection_data_failed(exec_ctx, cd, "HTTP proxy write response", error); + proxy_connection_failed(exec_ctx, conn, true /* is_client */, + "HTTP proxy write response", error); return; } + gpr_unref(&conn->refcount); // Clear write buffer. - gpr_slice_buffer_reset_and_unref(&cd->client_write_buffer); + gpr_slice_buffer_reset_and_unref(&conn->client_write_buffer); // Start reading from both client and server. - // We increase the refcount by one, since we already held one reference - // for ourselves, and there will now be two pending callbacks. - gpr_ref(&cd->refcount); - grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, - &cd->on_client_read_done); - grpc_endpoint_read(exec_ctx, cd->server_endpoint, &cd->server_read_buffer, - &cd->on_server_read_done); + do_client_read(exec_ctx, conn); + do_server_read(exec_ctx, conn); +} + +// Start a read from the client. +static void do_client_read(grpc_exec_ctx* exec_ctx, proxy_connection* conn) { + gpr_ref(&conn->refcount); + grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, + &conn->on_client_read_done); +} + +// Start a read from the server. +static void do_server_read(grpc_exec_ctx* exec_ctx, proxy_connection* conn) { + gpr_ref(&conn->refcount); + grpc_endpoint_read(exec_ctx, conn->server_endpoint, &conn->server_read_buffer, + &conn->on_server_read_done); } // Callback to connect to the backend server specified by the HTTP // CONNECT request. static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - connection_data* cd = arg; +gpr_log(GPR_INFO, "==> %s()", __func__); + proxy_connection* conn = arg; if (error != GRPC_ERROR_NONE) { - connection_data_failed(exec_ctx, cd, "HTTP proxy server connect", error); + // TODO(roth): Technically, in this case, we should handle the error + // by returning an HTTP response to the client indicating that the + // connection failed. However, for the purposes of this test code, + // it's fine to pretend this is a client-side error, which will + // cause the client connection to be dropped. + proxy_connection_failed(exec_ctx, conn, true /* is_client */, + "HTTP proxy server connect", error); return; } // We've established a connection, so send back a 200 response code to // the client. + // The write callback inherits our reference to conn. gpr_slice slice = gpr_slice_from_copied_string("200 connected\r\n\r\n"); - gpr_slice_buffer_add(&cd->client_write_buffer, slice); - grpc_endpoint_write(exec_ctx, cd->client_endpoint, &cd->client_write_buffer, - &cd->on_write_response_done); + gpr_slice_buffer_add(&conn->client_write_buffer, slice); + grpc_endpoint_write(exec_ctx, conn->client_endpoint, + &conn->client_write_buffer, + &conn->on_write_response_done); } // Callback to read the HTTP CONNECT request. +// TODO(roth): Technically, for any of the failure modes handled by this +// function, we should handle the error by returning an HTTP response to +// the client indicating that the request failed. However, for the purposes +// of this test code, it's fine to pretend this is a client-side error, +// which will cause the client connection to be dropped. static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - connection_data* cd = arg; +gpr_log(GPR_INFO, "==> %s()", __func__); + proxy_connection* conn = arg; if (error != GRPC_ERROR_NONE) { - connection_data_failed(exec_ctx, cd, "HTTP proxy read request", error); + proxy_connection_failed(exec_ctx, conn, true /* is_client */, + "HTTP proxy read request", error); return; } // Read request and feed it to the parser. - for (size_t i = 0; i < cd->client_read_buffer.count; ++i) { - if (GPR_SLICE_LENGTH(cd->client_read_buffer.slices[i]) > 0) { + for (size_t i = 0; i < conn->client_read_buffer.count; ++i) { + if (GPR_SLICE_LENGTH(conn->client_read_buffer.slices[i]) > 0) { error = grpc_http_parser_parse( - &cd->http_parser, cd->client_read_buffer.slices[i]); + &conn->http_parser, conn->client_read_buffer.slices[i]); if (error != GRPC_ERROR_NONE) { - connection_data_failed(exec_ctx, cd, "HTTP proxy request parse", - error); + proxy_connection_failed(exec_ctx, conn, true /* is_client */, + "HTTP proxy request parse", error); + GRPC_ERROR_UNREF(error); return; } } } - gpr_slice_buffer_reset_and_unref(&cd->client_read_buffer); + gpr_slice_buffer_reset_and_unref(&conn->client_read_buffer); // If we're not done reading the request, read more data. - if (cd->http_parser.state != GRPC_HTTP_BODY) { - grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, - &cd->on_read_request_done); + if (conn->http_parser.state != GRPC_HTTP_BODY) { + grpc_endpoint_read(exec_ctx, conn->client_endpoint, + &conn->client_read_buffer, &conn->on_read_request_done); return; } // Make sure we got a CONNECT request. - if (strcmp(cd->http_request.method, "CONNECT") != 0) { + if (strcmp(conn->http_request.method, "CONNECT") != 0) { char* msg; gpr_asprintf(&msg, "HTTP proxy got request method %s", - cd->http_request.method); + conn->http_request.method); error = GRPC_ERROR_CREATE(msg); gpr_free(msg); - connection_data_failed(exec_ctx, cd, "HTTP proxy read request", error); + proxy_connection_failed(exec_ctx, conn, true /* is_client */, + "HTTP proxy read request", error); + GRPC_ERROR_UNREF(error); return; } // Resolve address. grpc_resolved_addresses* resolved_addresses = NULL; - error = grpc_blocking_resolve_address(cd->http_request.path, "80", + error = grpc_blocking_resolve_address(conn->http_request.path, "80", &resolved_addresses); if (error != GRPC_ERROR_NONE) { - connection_data_failed(exec_ctx, cd, "HTTP proxy DNS lookup", error); + proxy_connection_failed(exec_ctx, conn, true /* is_client */, + "HTTP proxy DNS lookup", error); + GRPC_ERROR_UNREF(error); return; } GPR_ASSERT(resolved_addresses->naddrs >= 1); // Connect to requested address. + // The connection callback inherits our reference to conn. const gpr_timespec deadline = gpr_time_add( gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(10, GPR_TIMESPAN)); - grpc_tcp_client_connect(exec_ctx, &cd->on_server_connect_done, - &cd->server_endpoint, cd->pollset_set, + grpc_tcp_client_connect(exec_ctx, &conn->on_server_connect_done, + &conn->server_endpoint, conn->pollset_set, (struct sockaddr*)&resolved_addresses->addrs[0].addr, resolved_addresses->addrs[0].len, deadline); grpc_resolved_addresses_destroy(resolved_addresses); } static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, - grpc_endpoint* ep, grpc_pollset* accepting_pollset, + grpc_endpoint* endpoint, grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor) { +gpr_log(GPR_INFO, "==> %s()", __func__); grpc_end2end_http_proxy* proxy = arg; - // Instantiate connection_data. - connection_data* cd = gpr_malloc(sizeof(*cd)); - memset(cd, 0, sizeof(*cd)); - cd->client_endpoint = ep; - cd->pollset_set = grpc_pollset_set_create(); - grpc_pollset_set_add_pollset(exec_ctx, cd->pollset_set, proxy->pollset); - grpc_closure_init(&cd->on_read_request_done, on_read_request_done, cd); - grpc_closure_init(&cd->on_server_connect_done, on_server_connect_done, cd); - grpc_closure_init(&cd->on_write_response_done, on_write_response_done, cd); - grpc_closure_init(&cd->on_client_read_done, on_client_read_done, cd); - grpc_closure_init(&cd->on_client_write_done, on_client_write_done, cd); - grpc_closure_init(&cd->on_server_read_done, on_server_read_done, cd); - grpc_closure_init(&cd->on_server_write_done, on_server_write_done, cd); - gpr_slice_buffer_init(&cd->client_read_buffer); - gpr_slice_buffer_init(&cd->client_write_buffer); - gpr_slice_buffer_init(&cd->server_read_buffer); - gpr_slice_buffer_init(&cd->server_write_buffer); - grpc_http_parser_init(&cd->http_parser, GRPC_HTTP_REQUEST, - &cd->http_request); - cd->proxy = proxy; - gpr_ref_init(&cd->refcount, 1); - grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, - &cd->on_read_request_done); + // Instantiate proxy_connection. + proxy_connection* conn = gpr_malloc(sizeof(*conn)); + memset(conn, 0, sizeof(*conn)); + conn->client_endpoint = endpoint; + gpr_ref_init(&conn->refcount, 1); + conn->pollset_set = grpc_pollset_set_create(); + grpc_pollset_set_add_pollset(exec_ctx, conn->pollset_set, proxy->pollset); + grpc_closure_init(&conn->on_read_request_done, on_read_request_done, conn); + grpc_closure_init(&conn->on_server_connect_done, on_server_connect_done, + conn); + grpc_closure_init(&conn->on_write_response_done, on_write_response_done, + conn); + grpc_closure_init(&conn->on_client_read_done, on_client_read_done, conn); + grpc_closure_init(&conn->on_client_write_done, on_client_write_done, conn); + grpc_closure_init(&conn->on_server_read_done, on_server_read_done, conn); + grpc_closure_init(&conn->on_server_write_done, on_server_write_done, conn); + gpr_slice_buffer_init(&conn->client_read_buffer); + gpr_slice_buffer_init(&conn->client_write_buffer); + gpr_slice_buffer_init(&conn->server_read_buffer); + gpr_slice_buffer_init(&conn->server_write_buffer); + grpc_http_parser_init(&conn->http_parser, GRPC_HTTP_REQUEST, + &conn->http_request); + conn->proxy = proxy; + grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, + &conn->on_read_request_done); } // @@ -329,7 +415,7 @@ static void thread_main(void* arg) { do { const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); const gpr_timespec deadline = - gpr_time_add(now, gpr_time_from_seconds(5, GPR_TIMESPAN)); + gpr_time_add(now, gpr_time_from_seconds(1, GPR_TIMESPAN)); grpc_pollset_worker *worker = NULL; gpr_mu_lock(proxy->mu); GRPC_LOG_IF_ERROR("grpc_pollset_work", -- cgit v1.2.3 From 714c7ec74aab3e99d5e577eddd3665cbcbca5dd6 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 4 Aug 2016 12:58:16 -0700 Subject: Plumbed read_buffer through and fixed leftover bytes problem. --- .../ext/client_config/http_connect_handshaker.c | 52 +++++++++++++++------- .../transport/chttp2/transport/chttp2_transport.c | 3 +- src/core/lib/http/httpcli.c | 2 +- src/core/lib/http/parser.c | 24 ++++++---- src/core/lib/http/parser.h | 4 +- test/core/end2end/fixtures/http_proxy.c | 5 ++- test/core/http/parser_test.c | 10 +++-- test/core/http/request_fuzzer.c | 2 +- test/core/http/response_fuzzer.c | 2 +- 9 files changed, 67 insertions(+), 37 deletions(-) (limited to 'test/core/end2end/fixtures/http_proxy.c') diff --git a/src/core/ext/client_config/http_connect_handshaker.c b/src/core/ext/client_config/http_connect_handshaker.c index 25851c2efb..e6660fe2d4 100644 --- a/src/core/ext/client_config/http_connect_handshaker.c +++ b/src/core/ext/client_config/http_connect_handshaker.c @@ -56,9 +56,9 @@ typedef struct http_connect_handshaker { void* user_data; // Objects for processing the HTTP CONNECT request and response. - gpr_slice_buffer request_buffer; + gpr_slice_buffer write_buffer; + gpr_slice_buffer* read_buffer; grpc_closure request_done_closure; - gpr_slice_buffer response_buffer; grpc_closure response_read_closure; grpc_http_parser http_parser; grpc_http_response http_response; @@ -70,10 +70,11 @@ static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg, http_connect_handshaker* h = arg; if (error != GRPC_ERROR_NONE) { // If the write failed, invoke the callback immediately with the error. - h->cb(exec_ctx, h->endpoint, h->args, h->user_data, GRPC_ERROR_REF(error)); + h->cb(exec_ctx, h->endpoint, h->args, h->read_buffer, h->user_data, + GRPC_ERROR_REF(error)); } else { // Otherwise, read the response. - grpc_endpoint_read(exec_ctx, h->endpoint, &h->response_buffer, + grpc_endpoint_read(exec_ctx, h->endpoint, h->read_buffer, &h->response_read_closure); } } @@ -87,12 +88,29 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, goto done; } // Add buffer to parser. - for (size_t i = 0; i < h->response_buffer.count; ++i) { - if (GPR_SLICE_LENGTH(h->response_buffer.slices[i]) > 0) { + for (size_t i = 0; i < h->read_buffer->count; ++i) { + if (GPR_SLICE_LENGTH(h->read_buffer->slices[i]) > 0) { + size_t body_start_offset = 0; error = grpc_http_parser_parse( - &h->http_parser, h->response_buffer.slices[i]); + &h->http_parser, h->read_buffer->slices[i], &body_start_offset); if (error != GRPC_ERROR_NONE) goto done; + if (h->http_parser.state == GRPC_HTTP_BODY) { + // Remove the data we've already read from the read buffer, + // leaving only the leftover bytes (if any). + gpr_slice_buffer tmp_buffer; + gpr_slice_buffer_init(&tmp_buffer); + if (body_start_offset < GPR_SLICE_LENGTH(h->read_buffer->slices[i])) { + gpr_slice_buffer_add(&tmp_buffer, + gpr_slice_split_tail(&h->read_buffer->slices[i], + body_start_offset)); + } + gpr_slice_buffer_addn(&tmp_buffer, &h->read_buffer->slices[i + 1], + h->read_buffer->count - i - 1); + gpr_slice_buffer_swap(h->read_buffer, &tmp_buffer); + gpr_slice_buffer_destroy(&tmp_buffer); + break; + } } } // If we're not done reading the response, read more data. @@ -107,8 +125,8 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, // complete (e.g., handling chunked transfer encoding or looking // at the Content-Length: header). if (h->http_parser.state != GRPC_HTTP_BODY) { - gpr_slice_buffer_reset_and_unref(&h->response_buffer); - grpc_endpoint_read(exec_ctx, h->endpoint, &h->response_buffer, + gpr_slice_buffer_reset_and_unref(h->read_buffer); + grpc_endpoint_read(exec_ctx, h->endpoint, h->read_buffer, &h->response_read_closure); return; } @@ -122,7 +140,7 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, } done: // Invoke handshake-done callback. - h->cb(exec_ctx, h->endpoint, h->args, h->user_data, error); + h->cb(exec_ctx, h->endpoint, h->args, h->read_buffer, h->user_data, error); } // @@ -134,8 +152,7 @@ static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx, http_connect_handshaker* h = (http_connect_handshaker*)handshaker; gpr_free(h->proxy_server); gpr_free(h->server_name); - gpr_slice_buffer_destroy(&h->request_buffer); - gpr_slice_buffer_destroy(&h->response_buffer); + gpr_slice_buffer_destroy(&h->write_buffer); grpc_http_parser_destroy(&h->http_parser); grpc_http_response_destroy(&h->http_response); gpr_free(h); @@ -148,7 +165,8 @@ static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx, // FIXME BEFORE MERGING: apply deadline static void http_connect_handshaker_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, - grpc_endpoint* endpoint, grpc_channel_args* args, gpr_timespec deadline, + grpc_endpoint* endpoint, grpc_channel_args* args, + gpr_slice_buffer* read_buffer, gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb, void* user_data) { http_connect_handshaker* h = (http_connect_handshaker*)handshaker; @@ -158,9 +176,9 @@ static void http_connect_handshaker_do_handshake( h->cb = cb; h->user_data = user_data; // Initialize fields. - gpr_slice_buffer_init(&h->request_buffer); + gpr_slice_buffer_init(&h->write_buffer); + h->read_buffer = read_buffer; grpc_closure_init(&h->request_done_closure, on_write_done, h); - gpr_slice_buffer_init(&h->response_buffer); grpc_closure_init(&h->response_read_closure, on_read_done, h); grpc_http_parser_init(&h->http_parser, GRPC_HTTP_RESPONSE, &h->http_response); @@ -174,8 +192,8 @@ static void http_connect_handshaker_do_handshake( request.http.path = h->server_name; request.handshaker = &grpc_httpcli_plaintext; gpr_slice request_slice = grpc_httpcli_format_connect_request(&request); - gpr_slice_buffer_add(&h->request_buffer, request_slice); - grpc_endpoint_write(exec_ctx, endpoint, &h->request_buffer, + gpr_slice_buffer_add(&h->write_buffer, request_slice); + grpc_endpoint_write(exec_ctx, endpoint, &h->write_buffer, &h->request_done_closure); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index f2f5465201..6c608c8013 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1987,7 +1987,8 @@ static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, grpc_error *parse_error = GRPC_ERROR_NONE; for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) { - parse_error = grpc_http_parser_parse(&parser, t->read_buffer.slices[i]); + parse_error = grpc_http_parser_parse(&parser, t->read_buffer.slices[i], + NULL); } if (parse_error == GRPC_ERROR_NONE && (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) { diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 18135bcb58..7f3c2d120d 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -146,7 +146,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, if (GPR_SLICE_LENGTH(req->incoming.slices[i])) { req->have_read_byte = 1; grpc_error *err = - grpc_http_parser_parse(&req->parser, req->incoming.slices[i]); + grpc_http_parser_parse(&req->parser, req->incoming.slices[i], NULL); if (err != GRPC_ERROR_NONE) { finish(exec_ctx, req, err); return; diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c index d3bac5b876..cbf26811f7 100644 --- a/src/core/lib/http/parser.c +++ b/src/core/lib/http/parser.c @@ -33,6 +33,7 @@ #include "src/core/lib/http/parser.h" +#include #include #include @@ -200,7 +201,8 @@ done: return error; } -static grpc_error *finish_line(grpc_http_parser *parser) { +static grpc_error *finish_line(grpc_http_parser *parser, + bool *found_body_start) { grpc_error *err; switch (parser->state) { case GRPC_HTTP_FIRST_LINE: @@ -211,6 +213,7 @@ static grpc_error *finish_line(grpc_http_parser *parser) { case GRPC_HTTP_HEADERS: if (parser->cur_line_length == parser->cur_line_end_length) { parser->state = GRPC_HTTP_BODY; + *found_body_start = true; break; } err = add_header(parser); @@ -274,7 +277,8 @@ static bool check_line(grpc_http_parser *parser) { return false; } -static grpc_error *addbyte(grpc_http_parser *parser, uint8_t byte) { +static grpc_error *addbyte(grpc_http_parser *parser, uint8_t byte, + bool *found_body_start) { switch (parser->state) { case GRPC_HTTP_FIRST_LINE: case GRPC_HTTP_HEADERS: @@ -287,7 +291,7 @@ static grpc_error *addbyte(grpc_http_parser *parser, uint8_t byte) { parser->cur_line[parser->cur_line_length] = byte; parser->cur_line_length++; if (check_line(parser)) { - return finish_line(parser); + return finish_line(parser, found_body_start); } return GRPC_ERROR_NONE; case GRPC_HTTP_BODY: @@ -329,14 +333,16 @@ void grpc_http_response_destroy(grpc_http_response *response) { gpr_free(response->hdrs); } -grpc_error *grpc_http_parser_parse(grpc_http_parser *parser, gpr_slice slice) { - size_t i; - - for (i = 0; i < GPR_SLICE_LENGTH(slice); i++) { - grpc_error *err = addbyte(parser, GPR_SLICE_START_PTR(slice)[i]); +grpc_error *grpc_http_parser_parse(grpc_http_parser *parser, gpr_slice slice, + size_t *start_of_body) { + for (size_t i = 0; i < GPR_SLICE_LENGTH(slice); i++) { + bool found_body_start = false; + grpc_error *err = addbyte(parser, GPR_SLICE_START_PTR(slice)[i], + &found_body_start); if (err != GRPC_ERROR_NONE) return err; + if (found_body_start && start_of_body != NULL) + *start_of_body = i + 1; } - return GRPC_ERROR_NONE; } diff --git a/src/core/lib/http/parser.h b/src/core/lib/http/parser.h index 6df3cc8b13..fab42979cd 100644 --- a/src/core/lib/http/parser.h +++ b/src/core/lib/http/parser.h @@ -113,7 +113,9 @@ void grpc_http_parser_init(grpc_http_parser *parser, grpc_http_type type, void *request_or_response); void grpc_http_parser_destroy(grpc_http_parser *parser); -grpc_error *grpc_http_parser_parse(grpc_http_parser *parser, gpr_slice slice); +/* Sets \a start_of_body to the offset in \a slice of the start of the body. */ +grpc_error *grpc_http_parser_parse(grpc_http_parser *parser, gpr_slice slice, + size_t *start_of_body); grpc_error *grpc_http_parser_eof(grpc_http_parser *parser); void grpc_http_request_destroy(grpc_http_request *request); diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c index 3953687c26..77e0d9942b 100644 --- a/test/core/end2end/fixtures/http_proxy.c +++ b/test/core/end2end/fixtures/http_proxy.c @@ -297,7 +297,8 @@ gpr_log(GPR_INFO, "==> %s()", __func__); // We've established a connection, so send back a 200 response code to // the client. // The write callback inherits our reference to conn. - gpr_slice slice = gpr_slice_from_copied_string("200 connected\r\n\r\n"); + gpr_slice slice = + gpr_slice_from_copied_string("HTTP/1.0 200 connected\r\n\r\n"); gpr_slice_buffer_add(&conn->client_write_buffer, slice); grpc_endpoint_write(exec_ctx, conn->client_endpoint, &conn->client_write_buffer, @@ -323,7 +324,7 @@ gpr_log(GPR_INFO, "==> %s()", __func__); for (size_t i = 0; i < conn->client_read_buffer.count; ++i) { if (GPR_SLICE_LENGTH(conn->client_read_buffer.slices[i]) > 0) { error = grpc_http_parser_parse( - &conn->http_parser, conn->client_read_buffer.slices[i]); + &conn->http_parser, conn->client_read_buffer.slices[i], NULL); if (error != GRPC_ERROR_NONE) { proxy_connection_failed(exec_ctx, conn, true /* is_client */, "HTTP proxy request parse", error); diff --git a/test/core/http/parser_test.c b/test/core/http/parser_test.c index d645d2879c..211690eff9 100644 --- a/test/core/http/parser_test.c +++ b/test/core/http/parser_test.c @@ -62,7 +62,8 @@ static void test_request_succeeds(grpc_slice_split_mode split_mode, grpc_http_parser_init(&parser, GRPC_HTTP_REQUEST, &request); for (i = 0; i < num_slices; i++) { - GPR_ASSERT(grpc_http_parser_parse(&parser, slices[i]) == GRPC_ERROR_NONE); + GPR_ASSERT(grpc_http_parser_parse(&parser, slices[i], NULL) + == GRPC_ERROR_NONE); gpr_slice_unref(slices[i]); } GPR_ASSERT(grpc_http_parser_eof(&parser) == GRPC_ERROR_NONE); @@ -118,7 +119,8 @@ static void test_succeeds(grpc_slice_split_mode split_mode, char *response_text, grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response); for (i = 0; i < num_slices; i++) { - GPR_ASSERT(grpc_http_parser_parse(&parser, slices[i]) == GRPC_ERROR_NONE); + GPR_ASSERT(grpc_http_parser_parse(&parser, slices[i], NULL) + == GRPC_ERROR_NONE); gpr_slice_unref(slices[i]); } GPR_ASSERT(grpc_http_parser_eof(&parser) == GRPC_ERROR_NONE); @@ -171,7 +173,7 @@ static void test_fails(grpc_slice_split_mode split_mode, char *response_text) { for (i = 0; i < num_slices; i++) { if (GRPC_ERROR_NONE == error) { - error = grpc_http_parser_parse(&parser, slices[i]); + error = grpc_http_parser_parse(&parser, slices[i], NULL); } gpr_slice_unref(slices[i]); } @@ -204,7 +206,7 @@ static void test_request_fails(grpc_slice_split_mode split_mode, for (i = 0; i < num_slices; i++) { if (error == GRPC_ERROR_NONE) { - error = grpc_http_parser_parse(&parser, slices[i]); + error = grpc_http_parser_parse(&parser, slices[i], NULL); } gpr_slice_unref(slices[i]); } diff --git a/test/core/http/request_fuzzer.c b/test/core/http/request_fuzzer.c index 5941401867..bb6cb92c0c 100644 --- a/test/core/http/request_fuzzer.c +++ b/test/core/http/request_fuzzer.c @@ -48,7 +48,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { memset(&request, 0, sizeof(request)); grpc_http_parser_init(&parser, GRPC_HTTP_REQUEST, &request); gpr_slice slice = gpr_slice_from_copied_buffer((const char *)data, size); - GRPC_ERROR_UNREF(grpc_http_parser_parse(&parser, slice)); + GRPC_ERROR_UNREF(grpc_http_parser_parse(&parser, slice, NULL)); GRPC_ERROR_UNREF(grpc_http_parser_eof(&parser)); gpr_slice_unref(slice); grpc_http_parser_destroy(&parser); diff --git a/test/core/http/response_fuzzer.c b/test/core/http/response_fuzzer.c index acde7c80a4..4393840484 100644 --- a/test/core/http/response_fuzzer.c +++ b/test/core/http/response_fuzzer.c @@ -47,7 +47,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { memset(&response, 0, sizeof(response)); grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response); gpr_slice slice = gpr_slice_from_copied_buffer((const char *)data, size); - GRPC_ERROR_UNREF(grpc_http_parser_parse(&parser, slice)); + GRPC_ERROR_UNREF(grpc_http_parser_parse(&parser, slice, NULL)); GRPC_ERROR_UNREF(grpc_http_parser_eof(&parser)); gpr_slice_unref(slice); grpc_http_parser_destroy(&parser); -- cgit v1.2.3 From b350209b9f60fe64bcdef65f4df41093bc030d61 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 4 Aug 2016 13:04:30 -0700 Subject: Clean up test HTTP proxy code. --- test/core/end2end/fixtures/http_proxy.c | 91 +++++++++------------------------ 1 file changed, 24 insertions(+), 67 deletions(-) (limited to 'test/core/end2end/fixtures/http_proxy.c') diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c index 77e0d9942b..0cdb2884ee 100644 --- a/test/core/end2end/fixtures/http_proxy.c +++ b/test/core/end2end/fixtures/http_proxy.c @@ -77,15 +77,6 @@ typedef struct proxy_connection { grpc_endpoint* server_endpoint; gpr_refcount refcount; - bool client_shutdown; - bool server_shutdown; - bool client_write_pending; - bool server_write_pending; - -size_t client_bytes_read; -size_t client_bytes_written; -size_t server_bytes_read; -size_t server_bytes_written; grpc_pollset_set* pollset_set; @@ -109,26 +100,23 @@ size_t server_bytes_written; } proxy_connection; // Helper function to destroy the proxy connection. -static void proxy_connection_destroy(grpc_exec_ctx* exec_ctx, - proxy_connection* conn) { -gpr_log(GPR_INFO, "==> %s()", __func__); -gpr_log(GPR_INFO, "client_bytes_read=%lu", conn->client_bytes_read); -gpr_log(GPR_INFO, "server_bytes_written=%lu", conn->server_bytes_written); -gpr_log(GPR_INFO, "server_bytes_read=%lu", conn->server_bytes_read); -gpr_log(GPR_INFO, "client_bytes_written=%lu", conn->client_bytes_written); - // Tell the server to shut down when this connection is closed. - conn->proxy->shutdown = true; - grpc_endpoint_destroy(exec_ctx, conn->client_endpoint); - if (conn->server_endpoint != NULL) - grpc_endpoint_destroy(exec_ctx, conn->server_endpoint); - grpc_pollset_set_destroy(conn->pollset_set); - gpr_slice_buffer_destroy(&conn->client_read_buffer); - gpr_slice_buffer_destroy(&conn->client_write_buffer); - gpr_slice_buffer_destroy(&conn->server_read_buffer); - gpr_slice_buffer_destroy(&conn->server_write_buffer); - grpc_http_parser_destroy(&conn->http_parser); - grpc_http_request_destroy(&conn->http_request); - gpr_free(conn); +static void proxy_connection_unref(grpc_exec_ctx* exec_ctx, + proxy_connection* conn) { + if (gpr_unref(&conn->refcount)) { + // Tell the server to shut down when this connection is closed. + conn->proxy->shutdown = true; + grpc_endpoint_destroy(exec_ctx, conn->client_endpoint); + if (conn->server_endpoint != NULL) + grpc_endpoint_destroy(exec_ctx, conn->server_endpoint); + grpc_pollset_set_destroy(conn->pollset_set); + gpr_slice_buffer_destroy(&conn->client_read_buffer); + gpr_slice_buffer_destroy(&conn->client_write_buffer); + gpr_slice_buffer_destroy(&conn->server_read_buffer); + gpr_slice_buffer_destroy(&conn->server_write_buffer); + grpc_http_parser_destroy(&conn->http_parser); + grpc_http_request_destroy(&conn->http_request); + gpr_free(conn); + } } // Helper function to shut down the proxy connection. @@ -136,20 +124,13 @@ gpr_log(GPR_INFO, "client_bytes_written=%lu", conn->client_bytes_written); static void proxy_connection_failed(grpc_exec_ctx* exec_ctx, proxy_connection* conn, bool is_client, const char* prefix, grpc_error* error) { -gpr_log(GPR_INFO, "==> %s()", __func__); const char* msg = grpc_error_string(error); gpr_log(GPR_ERROR, "%s: %s", prefix, msg); grpc_error_free_string(msg); - if (is_client || !conn->client_write_pending) { - grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint); - conn->client_shutdown = true; - } - if (!is_client || !conn->server_write_pending) { + grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint); + if (conn->server_endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint); - conn->server_shutdown = true; - } - if (gpr_unref(&conn->refcount)) - proxy_connection_destroy(exec_ctx, conn); + proxy_connection_unref(exec_ctx, conn); } // Forward declarations. @@ -159,50 +140,37 @@ static void do_server_read(grpc_exec_ctx* exec_ctx, proxy_connection* conn); // Callback for writing proxy data to the client. static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_INFO, "==> %s()", __func__); proxy_connection* conn = arg; - conn->client_write_pending = false; if (error != GRPC_ERROR_NONE) { proxy_connection_failed(exec_ctx, conn, true /* is_client */, "HTTP proxy client write", error); return; } - gpr_unref(&conn->refcount); // Clear write buffer. -gpr_log(GPR_INFO, "wrote %lu bytes to client", conn->client_write_buffer.length); -conn->client_bytes_written += conn->client_write_buffer.length; gpr_slice_buffer_reset_and_unref(&conn->client_write_buffer); - // If the server has been shut down, shut down the client now. - if (conn->server_shutdown) - grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint); + // Unref the connection. + proxy_connection_unref(exec_ctx, conn); } // Callback for writing proxy data to the backend server. static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_INFO, "==> %s()", __func__); proxy_connection* conn = arg; - conn->server_write_pending = false; if (error != GRPC_ERROR_NONE) { proxy_connection_failed(exec_ctx, conn, false /* is_client */, "HTTP proxy server write", error); return; } - gpr_unref(&conn->refcount); // Clear write buffer. -gpr_log(GPR_INFO, "wrote %lu bytes to server", conn->server_write_buffer.length); -conn->server_bytes_written += conn->server_write_buffer.length; gpr_slice_buffer_reset_and_unref(&conn->server_write_buffer); - // If the client has been shut down, shut down the server now. - if (conn->client_shutdown) - grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint); + // Unref the connection. + proxy_connection_unref(exec_ctx, conn); } // Callback for reading data from the client, which will be proxied to // the backend server. static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_INFO, "==> %s()", __func__); proxy_connection* conn = arg; if (error != GRPC_ERROR_NONE) { proxy_connection_failed(exec_ctx, conn, true /* is_client */, @@ -211,11 +179,8 @@ gpr_log(GPR_INFO, "==> %s()", __func__); } // Move read data into write buffer and write it. // The write operation inherits our reference to conn. -gpr_log(GPR_INFO, "read %lu bytes from client", conn->client_read_buffer.length); -conn->client_bytes_read += conn->client_read_buffer.length; gpr_slice_buffer_move_into(&conn->client_read_buffer, &conn->server_write_buffer); - conn->server_write_pending = true; grpc_endpoint_write(exec_ctx, conn->server_endpoint, &conn->server_write_buffer, &conn->on_server_write_done); // Read more data. @@ -226,7 +191,6 @@ conn->client_bytes_read += conn->client_read_buffer.length; // proxied to the client. static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_INFO, "==> %s()", __func__); proxy_connection* conn = arg; if (error != GRPC_ERROR_NONE) { proxy_connection_failed(exec_ctx, conn, false /* is_client */, @@ -235,11 +199,8 @@ gpr_log(GPR_INFO, "==> %s()", __func__); } // Move read data into write buffer and write it. // The write operation inherits our reference to conn. -gpr_log(GPR_INFO, "read %lu bytes from server", conn->server_read_buffer.length); -conn->server_bytes_read += conn->server_read_buffer.length; gpr_slice_buffer_move_into(&conn->server_read_buffer, &conn->client_write_buffer); - conn->client_write_pending = true; grpc_endpoint_write(exec_ctx, conn->client_endpoint, &conn->client_write_buffer, &conn->on_client_write_done); // Read more data. @@ -249,7 +210,6 @@ conn->server_bytes_read += conn->server_read_buffer.length; // Callback to write the HTTP response for the CONNECT request. static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_INFO, "==> %s()", __func__); proxy_connection* conn = arg; if (error != GRPC_ERROR_NONE) { proxy_connection_failed(exec_ctx, conn, true /* is_client */, @@ -282,7 +242,6 @@ static void do_server_read(grpc_exec_ctx* exec_ctx, proxy_connection* conn) { // CONNECT request. static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_INFO, "==> %s()", __func__); proxy_connection* conn = arg; if (error != GRPC_ERROR_NONE) { // TODO(roth): Technically, in this case, we should handle the error @@ -313,7 +272,6 @@ gpr_log(GPR_INFO, "==> %s()", __func__); // which will cause the client connection to be dropped. static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_INFO, "==> %s()", __func__); proxy_connection* conn = arg; if (error != GRPC_ERROR_NONE) { proxy_connection_failed(exec_ctx, conn, true /* is_client */, @@ -377,7 +335,6 @@ gpr_log(GPR_INFO, "==> %s()", __func__); static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* endpoint, grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor) { -gpr_log(GPR_INFO, "==> %s()", __func__); grpc_end2end_http_proxy* proxy = arg; // Instantiate proxy_connection. proxy_connection* conn = gpr_malloc(sizeof(*conn)); -- cgit v1.2.3 From 0a05ab6e8e8e605f0fe4f2d0f4373d660bbdfe06 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 4 Aug 2016 13:10:13 -0700 Subject: clang-format --- .../ext/client_config/http_connect_handshaker.c | 22 +++++++------- src/core/ext/client_config/resolver_registry.c | 3 +- .../chttp2/client/insecure/channel_create.c | 7 ++--- .../chttp2/client/secure/secure_channel_create.c | 7 ++--- .../transport/chttp2/transport/chttp2_transport.c | 4 +-- src/core/lib/http/parser.c | 7 ++--- test/core/end2end/fixtures/h2_http_proxy.c | 2 +- test/core/end2end/fixtures/http_proxy.c | 34 +++++++++++----------- test/core/end2end/fixtures/http_proxy.h | 4 +-- test/core/http/parser_test.c | 8 ++--- 10 files changed, 46 insertions(+), 52 deletions(-) (limited to 'test/core/end2end/fixtures/http_proxy.c') diff --git a/src/core/ext/client_config/http_connect_handshaker.c b/src/core/ext/client_config/http_connect_handshaker.c index e6660fe2d4..54f592ef61 100644 --- a/src/core/ext/client_config/http_connect_handshaker.c +++ b/src/core/ext/client_config/http_connect_handshaker.c @@ -31,6 +31,8 @@ * */ +#include "src/core/ext/client_config/http_connect_handshaker.h" + #include #include @@ -40,7 +42,6 @@ #include "src/core/lib/http/format_request.h" #include "src/core/lib/http/parser.h" -#include "src/core/ext/client_config/http_connect_handshaker.h" typedef struct http_connect_handshaker { // Base class. Must be first. @@ -91,10 +92,9 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, for (size_t i = 0; i < h->read_buffer->count; ++i) { if (GPR_SLICE_LENGTH(h->read_buffer->slices[i]) > 0) { size_t body_start_offset = 0; - error = grpc_http_parser_parse( - &h->http_parser, h->read_buffer->slices[i], &body_start_offset); - if (error != GRPC_ERROR_NONE) - goto done; + error = grpc_http_parser_parse(&h->http_parser, h->read_buffer->slices[i], + &body_start_offset); + if (error != GRPC_ERROR_NONE) goto done; if (h->http_parser.state == GRPC_HTTP_BODY) { // Remove the data we've already read from the read buffer, // leaving only the leftover bytes (if any). @@ -138,7 +138,7 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, error = GRPC_ERROR_CREATE(msg); gpr_free(msg); } - done: +done: // Invoke handshake-done callback. h->cb(exec_ctx, h->endpoint, h->args, h->read_buffer, h->user_data, error); } @@ -159,8 +159,7 @@ static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx, } static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker) { -} + grpc_handshaker* handshaker) {} // FIXME BEFORE MERGING: apply deadline static void http_connect_handshaker_do_handshake( @@ -180,11 +179,10 @@ static void http_connect_handshaker_do_handshake( h->read_buffer = read_buffer; grpc_closure_init(&h->request_done_closure, on_write_done, h); grpc_closure_init(&h->response_read_closure, on_read_done, h); - grpc_http_parser_init(&h->http_parser, GRPC_HTTP_RESPONSE, - &h->http_response); + grpc_http_parser_init(&h->http_parser, GRPC_HTTP_RESPONSE, &h->http_response); // Send HTTP CONNECT request. - gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s", - h->server_name, h->proxy_server); + gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s", h->server_name, + h->proxy_server); grpc_httpcli_request request; memset(&request, 0, sizeof(request)); request.host = h->proxy_server; diff --git a/src/core/ext/client_config/resolver_registry.c b/src/core/ext/client_config/resolver_registry.c index 13f08e9fe6..5a8f137103 100644 --- a/src/core/ext/client_config/resolver_registry.c +++ b/src/core/ext/client_config/resolver_registry.c @@ -140,8 +140,7 @@ grpc_resolver *grpc_resolver_create( args.client_channel_factory = client_channel_factory; resolver = grpc_resolver_factory_create_resolver(factory, &args); const char *proxy = grpc_uri_get_query_arg(uri, "http_proxy"); - if (proxy != NULL) - *http_proxy = gpr_strdup(proxy); + if (proxy != NULL) *http_proxy = gpr_strdup(proxy); grpc_uri_destroy(uri); return resolver; } diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 7f0b13e321..475224effd 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -180,8 +180,7 @@ static void client_channel_factory_unref( "client_channel_factory"); } grpc_channel_args_destroy(f->merge_args); - if (f->http_proxy != NULL) - gpr_free(f->http_proxy); + if (f->http_proxy != NULL) gpr_free(f->http_proxy); gpr_free(f); } } @@ -219,8 +218,8 @@ static grpc_channel *client_channel_factory_create_channel( grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args, GRPC_CLIENT_CHANNEL, NULL); grpc_channel_args_destroy(final_args); - grpc_resolver *resolver = grpc_resolver_create(target, &f->base, - &f->http_proxy); + grpc_resolver *resolver = + grpc_resolver_create(target, &f->base, &f->http_proxy); if (!resolver) { GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "client_channel_factory_create_channel"); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 73c093fcd2..e06ae9e02c 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -242,8 +242,7 @@ static void client_channel_factory_unref( "client_channel_factory"); } grpc_channel_args_destroy(f->merge_args); - if (f->http_proxy != NULL) - gpr_free(f->http_proxy); + if (f->http_proxy != NULL) gpr_free(f->http_proxy); gpr_free(f); } } @@ -285,8 +284,8 @@ static grpc_channel *client_channel_factory_create_channel( GRPC_CLIENT_CHANNEL, NULL); grpc_channel_args_destroy(final_args); - grpc_resolver *resolver = grpc_resolver_create(target, &f->base, - &f->http_proxy); + grpc_resolver *resolver = + grpc_resolver_create(target, &f->base, &f->http_proxy); if (resolver != NULL) { grpc_client_channel_set_resolver( exec_ctx, grpc_channel_get_channel_stack(channel), resolver); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 6c608c8013..0a28f7b5e8 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1987,8 +1987,8 @@ static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, grpc_error *parse_error = GRPC_ERROR_NONE; for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) { - parse_error = grpc_http_parser_parse(&parser, t->read_buffer.slices[i], - NULL); + parse_error = + grpc_http_parser_parse(&parser, t->read_buffer.slices[i], NULL); } if (parse_error == GRPC_ERROR_NONE && (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) { diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c index cbf26811f7..be9e9b6b63 100644 --- a/src/core/lib/http/parser.c +++ b/src/core/lib/http/parser.c @@ -337,11 +337,10 @@ grpc_error *grpc_http_parser_parse(grpc_http_parser *parser, gpr_slice slice, size_t *start_of_body) { for (size_t i = 0; i < GPR_SLICE_LENGTH(slice); i++) { bool found_body_start = false; - grpc_error *err = addbyte(parser, GPR_SLICE_START_PTR(slice)[i], - &found_body_start); + grpc_error *err = + addbyte(parser, GPR_SLICE_START_PTR(slice)[i], &found_body_start); if (err != GRPC_ERROR_NONE) return err; - if (found_body_start && start_of_body != NULL) - *start_of_body = i + 1; + if (found_body_start && start_of_body != NULL) *start_of_body = i + 1; } return GRPC_ERROR_NONE; } diff --git a/test/core/end2end/fixtures/h2_http_proxy.c b/test/core/end2end/fixtures/h2_http_proxy.c index d84f0b8cb9..612a3dbb83 100644 --- a/test/core/end2end/fixtures/h2_http_proxy.c +++ b/test/core/end2end/fixtures/h2_http_proxy.c @@ -38,10 +38,10 @@ #include #include #include +#include #include #include #include -#include #include "src/core/ext/client_config/client_channel.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/connected_channel.h" diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c index 0cdb2884ee..53132c1636 100644 --- a/test/core/end2end/fixtures/http_proxy.c +++ b/test/core/end2end/fixtures/http_proxy.c @@ -281,8 +281,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, // Read request and feed it to the parser. for (size_t i = 0; i < conn->client_read_buffer.count; ++i) { if (GPR_SLICE_LENGTH(conn->client_read_buffer.slices[i]) > 0) { - error = grpc_http_parser_parse( - &conn->http_parser, conn->client_read_buffer.slices[i], NULL); + error = grpc_http_parser_parse(&conn->http_parser, + conn->client_read_buffer.slices[i], NULL); if (error != GRPC_ERROR_NONE) { proxy_connection_failed(exec_ctx, conn, true /* is_client */, "HTTP proxy request parse", error); @@ -368,17 +368,17 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, // static void thread_main(void* arg) { - grpc_end2end_http_proxy *proxy = arg; + grpc_end2end_http_proxy* proxy = arg; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; do { const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); const gpr_timespec deadline = gpr_time_add(now, gpr_time_from_seconds(1, GPR_TIMESPAN)); - grpc_pollset_worker *worker = NULL; + grpc_pollset_worker* worker = NULL; gpr_mu_lock(proxy->mu); - GRPC_LOG_IF_ERROR("grpc_pollset_work", - grpc_pollset_work(&exec_ctx, proxy->pollset, &worker, - now, deadline)); + GRPC_LOG_IF_ERROR( + "grpc_pollset_work", + grpc_pollset_work(&exec_ctx, proxy->pollset, &worker, now, deadline)); gpr_mu_unlock(proxy->mu); grpc_exec_ctx_flush(&exec_ctx); } while (!proxy->shutdown); @@ -394,8 +394,8 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create() { gpr_log(GPR_INFO, "Proxy address: %s", proxy->proxy_name); // Create TCP server. proxy->channel_args = grpc_channel_args_copy(NULL); - grpc_error* error = grpc_tcp_server_create( - NULL, proxy->channel_args, &proxy->server); + grpc_error* error = + grpc_tcp_server_create(NULL, proxy->channel_args, &proxy->server); GPR_ASSERT(error == GRPC_ERROR_NONE); // Bind to port. struct sockaddr_in addr; @@ -403,16 +403,16 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create() { addr.sin_family = AF_INET; grpc_sockaddr_set_port((struct sockaddr*)&addr, proxy_port); int port; - error = grpc_tcp_server_add_port( - proxy->server, (struct sockaddr*)&addr, sizeof(addr), &port); + error = grpc_tcp_server_add_port(proxy->server, (struct sockaddr*)&addr, + sizeof(addr), &port); GPR_ASSERT(error == GRPC_ERROR_NONE); GPR_ASSERT(port == proxy_port); // Start server. proxy->pollset = gpr_malloc(grpc_pollset_size()); grpc_pollset_init(proxy->pollset, &proxy->mu); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_tcp_server_start(&exec_ctx, proxy->server, &proxy->pollset, 1, - on_accept, proxy); + grpc_tcp_server_start(&exec_ctx, proxy->server, &proxy->pollset, 1, on_accept, + proxy); grpc_exec_ctx_finish(&exec_ctx); // Start proxy thread. gpr_thd_options opt = gpr_thd_options_default(); @@ -421,8 +421,8 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create() { return proxy; } -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_pollset* pollset = arg; grpc_pollset_destroy(pollset); gpr_free(pollset); @@ -442,7 +442,7 @@ void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { grpc_exec_ctx_finish(&exec_ctx); } -const char *grpc_end2end_http_proxy_get_proxy_name( - grpc_end2end_http_proxy *proxy) { +const char* grpc_end2end_http_proxy_get_proxy_name( + grpc_end2end_http_proxy* proxy) { return proxy->proxy_name; } diff --git a/test/core/end2end/fixtures/http_proxy.h b/test/core/end2end/fixtures/http_proxy.h index 7af2ea92d0..cd47b432af 100644 --- a/test/core/end2end/fixtures/http_proxy.h +++ b/test/core/end2end/fixtures/http_proxy.h @@ -37,5 +37,5 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(); void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy); -const char *grpc_end2end_http_proxy_get_proxy_name( - grpc_end2end_http_proxy *proxy); +const char* grpc_end2end_http_proxy_get_proxy_name( + grpc_end2end_http_proxy* proxy); diff --git a/test/core/http/parser_test.c b/test/core/http/parser_test.c index 211690eff9..2fc354d9ee 100644 --- a/test/core/http/parser_test.c +++ b/test/core/http/parser_test.c @@ -62,8 +62,8 @@ static void test_request_succeeds(grpc_slice_split_mode split_mode, grpc_http_parser_init(&parser, GRPC_HTTP_REQUEST, &request); for (i = 0; i < num_slices; i++) { - GPR_ASSERT(grpc_http_parser_parse(&parser, slices[i], NULL) - == GRPC_ERROR_NONE); + GPR_ASSERT(grpc_http_parser_parse(&parser, slices[i], NULL) == + GRPC_ERROR_NONE); gpr_slice_unref(slices[i]); } GPR_ASSERT(grpc_http_parser_eof(&parser) == GRPC_ERROR_NONE); @@ -119,8 +119,8 @@ static void test_succeeds(grpc_slice_split_mode split_mode, char *response_text, grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response); for (i = 0; i < num_slices; i++) { - GPR_ASSERT(grpc_http_parser_parse(&parser, slices[i], NULL) - == GRPC_ERROR_NONE); + GPR_ASSERT(grpc_http_parser_parse(&parser, slices[i], NULL) == + GRPC_ERROR_NONE); gpr_slice_unref(slices[i]); } GPR_ASSERT(grpc_http_parser_eof(&parser) == GRPC_ERROR_NONE); -- cgit v1.2.3 From a9288afaa90ffe38d3b41d1b9ec697621f41be59 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 15 Aug 2016 14:48:28 -0700 Subject: Fix test to shutdown the proxy from the main thread instead of doing so after the first connection terminates. --- test/core/end2end/fixtures/http_proxy.c | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) (limited to 'test/core/end2end/fixtures/http_proxy.c') diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c index 53132c1636..eb5ea0422f 100644 --- a/test/core/end2end/fixtures/http_proxy.c +++ b/test/core/end2end/fixtures/http_proxy.c @@ -95,16 +95,12 @@ typedef struct proxy_connection { grpc_http_parser http_parser; grpc_http_request http_request; - - grpc_end2end_http_proxy* proxy; // Does not own. } proxy_connection; // Helper function to destroy the proxy connection. static void proxy_connection_unref(grpc_exec_ctx* exec_ctx, proxy_connection* conn) { if (gpr_unref(&conn->refcount)) { - // Tell the server to shut down when this connection is closed. - conn->proxy->shutdown = true; grpc_endpoint_destroy(exec_ctx, conn->client_endpoint); if (conn->server_endpoint != NULL) grpc_endpoint_destroy(exec_ctx, conn->server_endpoint); @@ -358,7 +354,6 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, gpr_slice_buffer_init(&conn->server_write_buffer); grpc_http_parser_init(&conn->http_parser, GRPC_HTTP_REQUEST, &conn->http_request); - conn->proxy = proxy; grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, &conn->on_read_request_done); } @@ -429,6 +424,7 @@ static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* arg, } void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { + proxy->shutdown = true; // Signal proxy thread to shutdown. grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_thd_join(proxy->thd); grpc_tcp_server_shutdown_listeners(&exec_ctx, proxy->server); -- cgit v1.2.3 From 0c137e2e1f737d84e95805bf406a7f7e8938c065 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 18 Aug 2016 15:48:49 +0000 Subject: Fix tsan failures. --- test/core/end2end/fixtures/http_proxy.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'test/core/end2end/fixtures/http_proxy.c') diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c index eb5ea0422f..b4c0dfba61 100644 --- a/test/core/end2end/fixtures/http_proxy.c +++ b/test/core/end2end/fixtures/http_proxy.c @@ -36,6 +36,7 @@ #include #include +#include #include #include #include @@ -65,7 +66,7 @@ struct grpc_end2end_http_proxy { grpc_channel_args* channel_args; gpr_mu* mu; grpc_pollset* pollset; - bool shutdown; + gpr_atm shutdown; }; // @@ -376,7 +377,7 @@ static void thread_main(void* arg) { grpc_pollset_work(&exec_ctx, proxy->pollset, &worker, now, deadline)); gpr_mu_unlock(proxy->mu); grpc_exec_ctx_flush(&exec_ctx); - } while (!proxy->shutdown); + } while (!gpr_atm_acq_load(&proxy->shutdown)); grpc_exec_ctx_finish(&exec_ctx); } @@ -424,7 +425,7 @@ static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* arg, } void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { - proxy->shutdown = true; // Signal proxy thread to shutdown. + gpr_atm_rel_store(&proxy->shutdown, 1); // Signal proxy thread to shutdown. grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_thd_join(proxy->thd); grpc_tcp_server_shutdown_listeners(&exec_ctx, proxy->server); -- cgit v1.2.3 From 39b5871d7b1da79945c87f98acc4cbbd499ecfba Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 6 Sep 2016 12:50:42 -0700 Subject: Use http_proxy environment variable instead of URI query param. --- .../ext/client_config/http_connect_handshaker.c | 25 ++++++++++++++++++++++ .../ext/client_config/http_connect_handshaker.h | 4 ++++ src/core/ext/client_config/lb_policy_factory.h | 2 +- src/core/ext/client_config/resolver_registry.c | 5 +---- src/core/ext/client_config/resolver_registry.h | 7 ++---- src/core/ext/client_config/uri_parser.c | 5 ++--- src/core/ext/resolver/dns/native/dns_resolver.c | 8 ++++--- .../chttp2/client/insecure/channel_create.c | 11 +++++----- .../chttp2/client/secure/secure_channel_create.c | 11 +++++----- test/core/client_config/uri_parser_test.c | 2 -- test/core/end2end/fixtures/h2_http_proxy.c | 11 +++++----- test/core/end2end/fixtures/http_proxy.c | 2 +- 12 files changed, 57 insertions(+), 36 deletions(-) (limited to 'test/core/end2end/fixtures/http_proxy.c') diff --git a/src/core/ext/client_config/http_connect_handshaker.c b/src/core/ext/client_config/http_connect_handshaker.c index 55b01bd46e..097465469e 100644 --- a/src/core/ext/client_config/http_connect_handshaker.c +++ b/src/core/ext/client_config/http_connect_handshaker.c @@ -40,9 +40,11 @@ #include #include +#include "src/core/ext/client_config/uri_parser.h" #include "src/core/lib/http/format_request.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/support/env.h" typedef struct http_connect_handshaker { // Base class. Must be first. @@ -247,3 +249,26 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, gpr_ref_init(&handshaker->refcount, 1); return &handshaker->base; } + +char* grpc_get_http_proxy_server() { + char* uri_str = gpr_getenv("http_proxy"); + if (uri_str == NULL) return NULL; + grpc_uri* uri = grpc_uri_parse(uri_str, false /* suppress_errors */); + char* proxy_name = NULL; + if (uri == NULL || uri->authority == NULL) { + gpr_log(GPR_ERROR, "cannot parse value of 'http_proxy' env var"); + goto done; + } + if (strcmp(uri->scheme, "http") != 0) { + gpr_log(GPR_ERROR, "'%s' scheme not supported in proxy URI", uri->scheme); + goto done; + } + if (strchr(uri->authority, '@') != NULL) { + gpr_log(GPR_ERROR, "userinfo not supported in proxy URI"); + goto done; + } + proxy_name = gpr_strdup(uri->authority); +done: + grpc_uri_destroy(uri); + return proxy_name; +} diff --git a/src/core/ext/client_config/http_connect_handshaker.h b/src/core/ext/client_config/http_connect_handshaker.h index 146ef9369a..1fc3948267 100644 --- a/src/core/ext/client_config/http_connect_handshaker.h +++ b/src/core/ext/client_config/http_connect_handshaker.h @@ -40,4 +40,8 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, const char* server_name); +/// Returns the name of the proxy to use, or NULL if no proxy is configured. +/// Caller takes ownership of result. +char* grpc_get_http_proxy_server(); + #endif /* GRPC_CORE_EXT_CLIENT_CONFIG_HTTP_CONNECT_HANDSHAKER_H */ diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h index 5806deef9b..a9d3588767 100644 --- a/src/core/ext/client_config/lb_policy_factory.h +++ b/src/core/ext/client_config/lb_policy_factory.h @@ -48,7 +48,7 @@ struct grpc_lb_policy_factory { }; typedef struct grpc_lb_policy_args { - char *server_name; // Does not own. + char *server_name; grpc_resolved_addresses *addresses; grpc_client_channel_factory *client_channel_factory; } grpc_lb_policy_args; diff --git a/src/core/ext/client_config/resolver_registry.c b/src/core/ext/client_config/resolver_registry.c index 5a8f137103..e7a4abd568 100644 --- a/src/core/ext/client_config/resolver_registry.c +++ b/src/core/ext/client_config/resolver_registry.c @@ -129,8 +129,7 @@ static grpc_resolver_factory *resolve_factory(const char *target, } grpc_resolver *grpc_resolver_create( - const char *target, grpc_client_channel_factory *client_channel_factory, - char **http_proxy) { + const char *target, grpc_client_channel_factory *client_channel_factory) { grpc_uri *uri = NULL; grpc_resolver_factory *factory = resolve_factory(target, &uri); grpc_resolver *resolver; @@ -139,8 +138,6 @@ grpc_resolver *grpc_resolver_create( args.uri = uri; args.client_channel_factory = client_channel_factory; resolver = grpc_resolver_factory_create_resolver(factory, &args); - const char *proxy = grpc_uri_get_query_arg(uri, "http_proxy"); - if (proxy != NULL) *http_proxy = gpr_strdup(proxy); grpc_uri_destroy(uri); return resolver; } diff --git a/src/core/ext/client_config/resolver_registry.h b/src/core/ext/client_config/resolver_registry.h index 28843001ea..5ef1383cd3 100644 --- a/src/core/ext/client_config/resolver_registry.h +++ b/src/core/ext/client_config/resolver_registry.h @@ -54,12 +54,9 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory); was not NULL). If a resolver factory was found, use it to instantiate a resolver and return it. - If a resolver factory was not found, return NULL. - If \a target specifies an http_proxy as a query arg, sets \a http_proxy - to the value (which the caller takes ownership of). */ + If a resolver factory was not found, return NULL. */ grpc_resolver *grpc_resolver_create( - const char *target, grpc_client_channel_factory *client_channel_factory, - char **http_proxy); + const char *target, grpc_client_channel_factory *client_channel_factory); /** Find a resolver factory given a name and return an (owned-by-the-caller) * reference to it */ diff --git a/src/core/ext/client_config/uri_parser.c b/src/core/ext/client_config/uri_parser.c index 5e8432c6c8..3ca1a58e69 100644 --- a/src/core/ext/client_config/uri_parser.c +++ b/src/core/ext/client_config/uri_parser.c @@ -118,8 +118,8 @@ static int parse_fragment_or_query(const char *uri_text, size_t *i) { const size_t advance = parse_pchar(uri_text, *i); /* pchar */ switch (advance) { case 0: /* uri_text[i] isn't in pchar */ - /* maybe it's ? or / or : */ - if (uri_text[*i] == '?' || uri_text[*i] == '/' || uri_text[*i] == ':') { + /* maybe it's ? or / */ + if (uri_text[*i] == '?' || uri_text[*i] == '/') { (*i)++; break; } else { @@ -282,7 +282,6 @@ grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { } const char *grpc_uri_get_query_arg(const grpc_uri *uri, const char *key) { - if (uri == NULL) return NULL; GPR_ASSERT(key != NULL); if (key[0] == '\0') return NULL; diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 5f41fdcc2f..5886f6dcbf 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -37,6 +37,7 @@ #include #include +#include "src/core/ext/client_config/http_connect_handshaker.h" #include "src/core/ext/client_config/lb_policy_registry.h" #include "src/core/ext/client_config/resolver_registry.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -262,10 +263,11 @@ static grpc_resolver *dns_create(grpc_resolver_args *args, gpr_log(GPR_ERROR, "authority based dns uri's not supported"); return NULL; } - // Get name and (optionally) proxy address from args. + // Get name from args. const char *path = args->uri->path; if (path[0] == '/') ++path; - const char *proxy_name = grpc_uri_get_query_arg(args->uri, "http_proxy"); + // Get proxy name, if any. + char *proxy_name = grpc_get_http_proxy_server(); // Create resolver. dns_resolver *r = gpr_malloc(sizeof(dns_resolver)); memset(r, 0, sizeof(*r)); @@ -273,7 +275,7 @@ static grpc_resolver *dns_create(grpc_resolver_args *args, gpr_mu_init(&r->mu); grpc_resolver_init(&r->base, &dns_resolver_vtable); r->target_name = gpr_strdup(path); - r->name_to_resolve = gpr_strdup(proxy_name == NULL ? path : proxy_name); + r->name_to_resolve = proxy_name == NULL ? gpr_strdup(path) : proxy_name; r->default_port = gpr_strdup(default_port); r->client_channel_factory = args->client_channel_factory; gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER, diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 475224effd..14dc7f142f 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -162,7 +162,6 @@ typedef struct { gpr_refcount refs; grpc_channel_args *merge_args; grpc_channel *master; - char *http_proxy; } client_channel_factory; static void client_channel_factory_ref( @@ -180,7 +179,6 @@ static void client_channel_factory_unref( "client_channel_factory"); } grpc_channel_args_destroy(f->merge_args); - if (f->http_proxy != NULL) gpr_free(f->http_proxy); gpr_free(f); } } @@ -197,10 +195,12 @@ static grpc_subchannel *client_channel_factory_create_subchannel( c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); c->handshake_mgr = grpc_handshake_manager_create(); - if (f->http_proxy != NULL) { + char *proxy_name = grpc_get_http_proxy_server(); + if (proxy_name != NULL) { grpc_handshake_manager_add( c->handshake_mgr, - grpc_http_connect_handshaker_create(f->http_proxy, args->server_name)); + grpc_http_connect_handshaker_create(proxy_name, args->server_name)); + gpr_free(proxy_name); } args->args = final_args; s = grpc_subchannel_create(exec_ctx, &c->base, args); @@ -218,8 +218,7 @@ static grpc_channel *client_channel_factory_create_channel( grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args, GRPC_CLIENT_CHANNEL, NULL); grpc_channel_args_destroy(final_args); - grpc_resolver *resolver = - grpc_resolver_create(target, &f->base, &f->http_proxy); + grpc_resolver *resolver = grpc_resolver_create(target, &f->base); if (!resolver) { GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "client_channel_factory_create_channel"); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index e06ae9e02c..a9616e92d0 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -222,7 +222,6 @@ typedef struct { grpc_channel_args *merge_args; grpc_channel_security_connector *security_connector; grpc_channel *master; - char *http_proxy; } client_channel_factory; static void client_channel_factory_ref( @@ -242,7 +241,6 @@ static void client_channel_factory_unref( "client_channel_factory"); } grpc_channel_args_destroy(f->merge_args); - if (f->http_proxy != NULL) gpr_free(f->http_proxy); gpr_free(f); } } @@ -259,10 +257,12 @@ static grpc_subchannel *client_channel_factory_create_subchannel( c->base.vtable = &connector_vtable; c->security_connector = f->security_connector; c->handshake_mgr = grpc_handshake_manager_create(); - if (f->http_proxy != NULL) { + char *proxy_name = grpc_get_http_proxy_server(); + if (proxy_name != NULL) { grpc_handshake_manager_add( c->handshake_mgr, - grpc_http_connect_handshaker_create(f->http_proxy, args->server_name)); + grpc_http_connect_handshaker_create(proxy_name, args->server_name)); + gpr_free(proxy_name); } gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); @@ -284,8 +284,7 @@ static grpc_channel *client_channel_factory_create_channel( GRPC_CLIENT_CHANNEL, NULL); grpc_channel_args_destroy(final_args); - grpc_resolver *resolver = - grpc_resolver_create(target, &f->base, &f->http_proxy); + grpc_resolver *resolver = grpc_resolver_create(target, &f->base); if (resolver != NULL) { grpc_client_channel_set_resolver( exec_ctx, grpc_channel_get_channel_stack(channel), resolver); diff --git a/test/core/client_config/uri_parser_test.c b/test/core/client_config/uri_parser_test.c index 4bc3d1e39f..323e8b6f70 100644 --- a/test/core/client_config/uri_parser_test.c +++ b/test/core/client_config/uri_parser_test.c @@ -142,8 +142,6 @@ int main(int argc, char **argv) { test_succeeds("http:?legit#twice", "http", "", "", "legit", "twice"); test_succeeds("http://foo?bar#lol?", "http", "foo", "", "bar", "lol?"); test_succeeds("http://foo?bar#lol?/", "http", "foo", "", "bar", "lol?/"); - test_succeeds("dns:///server:123?http_proxy=proxy:456", "dns", "", - "/server:123", "http_proxy=proxy:456", ""); test_fails("xyz"); test_fails("http:?dangling-pct-%0"); diff --git a/test/core/end2end/fixtures/h2_http_proxy.c b/test/core/end2end/fixtures/h2_http_proxy.c index 612a3dbb83..a675a11f66 100644 --- a/test/core/end2end/fixtures/h2_http_proxy.c +++ b/test/core/end2end/fixtures/h2_http_proxy.c @@ -46,6 +46,7 @@ #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/http_server_filter.h" +#include "src/core/lib/support/env.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/server.h" #include "test/core/end2end/fixtures/http_proxy.h" @@ -76,12 +77,12 @@ static grpc_end2end_test_fixture chttp2_create_fixture_fullstack( void chttp2_init_client_fullstack(grpc_end2end_test_fixture *f, grpc_channel_args *client_args) { fullstack_fixture_data *ffd = f->fixture_data; - char *target_uri; - gpr_asprintf(&target_uri, "%s?http_proxy=%s", ffd->server_addr, + char *proxy_uri; + gpr_asprintf(&proxy_uri, "http://%s", grpc_end2end_http_proxy_get_proxy_name(ffd->proxy)); - gpr_log(GPR_INFO, "target_uri: %s", target_uri); - f->client = grpc_insecure_channel_create(target_uri, client_args, NULL); - gpr_free(target_uri); + gpr_setenv("http_proxy", proxy_uri); + gpr_free(proxy_uri); + f->client = grpc_insecure_channel_create(ffd->server_addr, client_args, NULL); GPR_ASSERT(f->client); } diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c index b4c0dfba61..c92f869be1 100644 --- a/test/core/end2end/fixtures/http_proxy.c +++ b/test/core/end2end/fixtures/http_proxy.c @@ -122,7 +122,7 @@ static void proxy_connection_failed(grpc_exec_ctx* exec_ctx, proxy_connection* conn, bool is_client, const char* prefix, grpc_error* error) { const char* msg = grpc_error_string(error); - gpr_log(GPR_ERROR, "%s: %s", prefix, msg); + gpr_log(GPR_INFO, "%s: %s", prefix, msg); grpc_error_free_string(msg); grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint); if (conn->server_endpoint != NULL) -- cgit v1.2.3 From 0209f675c44fc87f35cabd9c2ea623101e8e1cb0 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 6 Sep 2016 15:11:34 -0700 Subject: Fix HTTP proxy code to avoid having multiple outstanding writes. --- test/core/end2end/fixtures/http_proxy.c | 110 +++++++++++++++++++++----------- 1 file changed, 73 insertions(+), 37 deletions(-) (limited to 'test/core/end2end/fixtures/http_proxy.c') diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c index c92f869be1..47f50069e2 100644 --- a/test/core/end2end/fixtures/http_proxy.c +++ b/test/core/end2end/fixtures/http_proxy.c @@ -90,8 +90,10 @@ typedef struct proxy_connection { grpc_closure on_server_write_done; gpr_slice_buffer client_read_buffer; + gpr_slice_buffer client_deferred_write_buffer; gpr_slice_buffer client_write_buffer; gpr_slice_buffer server_read_buffer; + gpr_slice_buffer server_deferred_write_buffer; gpr_slice_buffer server_write_buffer; grpc_http_parser http_parser; @@ -107,8 +109,10 @@ static void proxy_connection_unref(grpc_exec_ctx* exec_ctx, grpc_endpoint_destroy(exec_ctx, conn->server_endpoint); grpc_pollset_set_destroy(conn->pollset_set); gpr_slice_buffer_destroy(&conn->client_read_buffer); + gpr_slice_buffer_destroy(&conn->client_deferred_write_buffer); gpr_slice_buffer_destroy(&conn->client_write_buffer); gpr_slice_buffer_destroy(&conn->server_read_buffer); + gpr_slice_buffer_destroy(&conn->server_deferred_write_buffer); gpr_slice_buffer_destroy(&conn->server_write_buffer); grpc_http_parser_destroy(&conn->http_parser); grpc_http_request_destroy(&conn->http_request); @@ -130,10 +134,6 @@ static void proxy_connection_failed(grpc_exec_ctx* exec_ctx, proxy_connection_unref(exec_ctx, conn); } -// Forward declarations. -static void do_client_read(grpc_exec_ctx* exec_ctx, proxy_connection* conn); -static void do_server_read(grpc_exec_ctx* exec_ctx, proxy_connection* conn); - // Callback for writing proxy data to the client. static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { @@ -143,10 +143,20 @@ static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, "HTTP proxy client write", error); return; } - // Clear write buffer. + // Clear write buffer (the data we just wrote). gpr_slice_buffer_reset_and_unref(&conn->client_write_buffer); - // Unref the connection. - proxy_connection_unref(exec_ctx, conn); + // If more data was read from the server since we started this write, + // write that data now. + if (conn->client_deferred_write_buffer.length > 0) { + gpr_slice_buffer_move_into(&conn->client_deferred_write_buffer, + &conn->client_write_buffer); + grpc_endpoint_write(exec_ctx, conn->client_endpoint, + &conn->client_write_buffer, + &conn->on_client_write_done); + } else { + // No more writes. Unref the connection. + proxy_connection_unref(exec_ctx, conn); + } } // Callback for writing proxy data to the backend server. @@ -158,10 +168,20 @@ static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, "HTTP proxy server write", error); return; } - // Clear write buffer. + // Clear write buffer (the data we just wrote). gpr_slice_buffer_reset_and_unref(&conn->server_write_buffer); - // Unref the connection. - proxy_connection_unref(exec_ctx, conn); + // If more data was read from the client since we started this write, + // write that data now. + if (conn->server_deferred_write_buffer.length > 0) { + gpr_slice_buffer_move_into(&conn->server_deferred_write_buffer, + &conn->server_write_buffer); + grpc_endpoint_write(exec_ctx, conn->server_endpoint, + &conn->server_write_buffer, + &conn->on_server_write_done); + } else { + // No more writes. Unref the connection. + proxy_connection_unref(exec_ctx, conn); + } } // Callback for reading data from the client, which will be proxied to @@ -174,14 +194,26 @@ static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, "HTTP proxy client read", error); return; } - // Move read data into write buffer and write it. - // The write operation inherits our reference to conn. - gpr_slice_buffer_move_into(&conn->client_read_buffer, - &conn->server_write_buffer); - grpc_endpoint_write(exec_ctx, conn->server_endpoint, - &conn->server_write_buffer, &conn->on_server_write_done); + // If there is already a pending write (i.e., server_write_buffer is + // not empty), then move the read data into server_deferred_write_buffer, + // and the next write will be requested in on_server_write_done(), when + // the current write is finished. + // + // Otherwise, move the read data into the write buffer and write it. + if (conn->client_write_buffer.length > 0) { + gpr_slice_buffer_move_into(&conn->client_read_buffer, + &conn->server_deferred_write_buffer); + } else { + gpr_slice_buffer_move_into(&conn->client_read_buffer, + &conn->server_write_buffer); + gpr_ref(&conn->refcount); + grpc_endpoint_write(exec_ctx, conn->server_endpoint, + &conn->server_write_buffer, + &conn->on_server_write_done); + } // Read more data. - do_client_read(exec_ctx, conn); + grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, + &conn->on_client_read_done); } // Callback for reading data from the backend server, which will be @@ -194,14 +226,26 @@ static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, "HTTP proxy server read", error); return; } - // Move read data into write buffer and write it. - // The write operation inherits our reference to conn. - gpr_slice_buffer_move_into(&conn->server_read_buffer, - &conn->client_write_buffer); - grpc_endpoint_write(exec_ctx, conn->client_endpoint, - &conn->client_write_buffer, &conn->on_client_write_done); + // If there is already a pending write (i.e., client_write_buffer is + // not empty), then move the read data into client_deferred_write_buffer, + // and the next write will be requested in on_client_write_done(), when + // the current write is finished. + // + // Otherwise, move the read data into the write buffer and write it. + if (conn->client_write_buffer.length > 0) { + gpr_slice_buffer_move_into(&conn->server_read_buffer, + &conn->client_deferred_write_buffer); + } else { + gpr_slice_buffer_move_into(&conn->server_read_buffer, + &conn->client_write_buffer); + gpr_ref(&conn->refcount); + grpc_endpoint_write(exec_ctx, conn->client_endpoint, + &conn->client_write_buffer, + &conn->on_client_write_done); + } // Read more data. - do_server_read(exec_ctx, conn); + grpc_endpoint_read(exec_ctx, conn->server_endpoint, &conn->server_read_buffer, + &conn->on_server_read_done); } // Callback to write the HTTP response for the CONNECT request. @@ -213,24 +257,14 @@ static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, "HTTP proxy write response", error); return; } - gpr_unref(&conn->refcount); // Clear write buffer. gpr_slice_buffer_reset_and_unref(&conn->client_write_buffer); - // Start reading from both client and server. - do_client_read(exec_ctx, conn); - do_server_read(exec_ctx, conn); -} - -// Start a read from the client. -static void do_client_read(grpc_exec_ctx* exec_ctx, proxy_connection* conn) { + // Start reading from both client and server. One of the read + // requests inherits our ref to conn, but we need to take a new ref + // for the other one. gpr_ref(&conn->refcount); grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, &conn->on_client_read_done); -} - -// Start a read from the server. -static void do_server_read(grpc_exec_ctx* exec_ctx, proxy_connection* conn) { - gpr_ref(&conn->refcount); grpc_endpoint_read(exec_ctx, conn->server_endpoint, &conn->server_read_buffer, &conn->on_server_read_done); } @@ -350,8 +384,10 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_closure_init(&conn->on_server_read_done, on_server_read_done, conn); grpc_closure_init(&conn->on_server_write_done, on_server_write_done, conn); gpr_slice_buffer_init(&conn->client_read_buffer); + gpr_slice_buffer_init(&conn->client_deferred_write_buffer); gpr_slice_buffer_init(&conn->client_write_buffer); gpr_slice_buffer_init(&conn->server_read_buffer); + gpr_slice_buffer_init(&conn->server_deferred_write_buffer); gpr_slice_buffer_init(&conn->server_write_buffer); grpc_http_parser_init(&conn->http_parser, GRPC_HTTP_REQUEST, &conn->http_request); -- cgit v1.2.3 From cd5f7ede27ced0f4c4129da004e544b19d992dd3 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 8 Sep 2016 07:40:23 -0700 Subject: Fix bug from previous commit. --- test/core/end2end/fixtures/http_proxy.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test/core/end2end/fixtures/http_proxy.c') diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c index 47f50069e2..22533b9694 100644 --- a/test/core/end2end/fixtures/http_proxy.c +++ b/test/core/end2end/fixtures/http_proxy.c @@ -200,7 +200,7 @@ static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, // the current write is finished. // // Otherwise, move the read data into the write buffer and write it. - if (conn->client_write_buffer.length > 0) { + if (conn->server_write_buffer.length > 0) { gpr_slice_buffer_move_into(&conn->client_read_buffer, &conn->server_deferred_write_buffer); } else { -- cgit v1.2.3