diff options
author | Mark D. Roth <roth@google.com> | 2016-07-29 07:59:25 -0700 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2016-07-29 07:59:25 -0700 |
commit | 9f709a4a7e60e0fdc0e4cfdaaa70ee2d226a3100 (patch) | |
tree | a762d9cd18e4dcabfe511214389fd80141bceba8 /test/core | |
parent | 64d88bdafdc913b2b5a9fb34e7181dc2e7db4d0e (diff) |
Work on debugging the HTTP proxy implementation.
Diffstat (limited to 'test/core')
-rw-r--r-- | test/core/end2end/fixtures/h2_http_proxy.c | 10 | ||||
-rw-r--r-- | test/core/end2end/fixtures/http_proxy.c | 138 | ||||
-rw-r--r-- | test/core/end2end/fixtures/http_proxy.h | 2 |
3 files changed, 119 insertions, 31 deletions
diff --git a/test/core/end2end/fixtures/h2_http_proxy.c b/test/core/end2end/fixtures/h2_http_proxy.c index d84f0b8cb9..002e2cb16b 100644 --- a/test/core/end2end/fixtures/h2_http_proxy.c +++ b/test/core/end2end/fixtures/h2_http_proxy.c @@ -52,6 +52,7 @@ #include "test/core/util/port.h" #include "test/core/util/test_config.h" +#if 0 typedef struct fullstack_fixture_data { char *server_addr; grpc_end2end_http_proxy *proxy; @@ -110,17 +111,24 @@ static grpc_end2end_test_config configs[] = { chttp2_create_fixture_fullstack, chttp2_init_client_fullstack, chttp2_init_server_fullstack, chttp2_tear_down_fullstack}, }; +#endif int main(int argc, char **argv) { - size_t i; +// size_t i; grpc_test_init(argc, argv); grpc_end2end_tests_pre_init(); grpc_init(); + grpc_end2end_http_proxy* proxy = grpc_end2end_http_proxy_create(); + grpc_end2end_http_proxy_start_thread(proxy); + grpc_end2end_http_proxy_destroy(proxy); + +#if 0 for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) { grpc_end2end_tests(argc, argv, configs[i]); } +#endif grpc_shutdown(); diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c index a8d68f0249..975deaeb30 100644 --- a/test/core/end2end/fixtures/http_proxy.c +++ b/test/core/end2end/fixtures/http_proxy.c @@ -41,7 +41,7 @@ #include <grpc/support/slice_buffer.h> #include <grpc/support/string_util.h> #include <grpc/support/sync.h> -//#include <grpc/support/thd.h> +#include <grpc/support/thd.h> #include <grpc/support/useful.h> #include "src/core/lib/channel/channel_args.h" @@ -58,6 +58,16 @@ #include "src/core/lib/iomgr/tcp_server.h" #include "test/core/util/port.h" +struct grpc_end2end_http_proxy { + char* proxy_name; + gpr_thd_id thd; + grpc_tcp_server* server; + grpc_channel_args* channel_args; + gpr_mu* mu; + grpc_pollset* pollset; + bool shutdown; +}; + // // Connection handling // @@ -83,10 +93,16 @@ typedef struct connection_data { grpc_http_parser http_parser; grpc_http_request http_request; + + grpc_end2end_http_proxy* proxy; + + gpr_refcount refcount; } connection_data; static void connection_data_destroy(grpc_exec_ctx* exec_ctx, connection_data* cd) { +gpr_log(GPR_ERROR, "==> %s()", __func__); + cd->proxy->shutdown = true; grpc_endpoint_destroy(exec_ctx, cd->client_endpoint); if (cd->server_endpoint != NULL) grpc_endpoint_destroy(exec_ctx, cd->server_endpoint); @@ -103,62 +119,96 @@ static void connection_data_destroy(grpc_exec_ctx* exec_ctx, static void connection_data_failed(grpc_exec_ctx* exec_ctx, connection_data* cd, const char* prefix, grpc_error* error) { +gpr_log(GPR_ERROR, "==> %s()", __func__); const char* msg = grpc_error_string(error); gpr_log(GPR_ERROR, "%s: %s", prefix, msg); grpc_error_free_string(msg); GRPC_ERROR_UNREF(error); +gpr_log(GPR_ERROR, "HERE 0"); grpc_endpoint_shutdown(exec_ctx, cd->client_endpoint); +gpr_log(GPR_ERROR, "HERE 1"); if (cd->server_endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, cd->server_endpoint); - connection_data_destroy(exec_ctx, cd); +gpr_log(GPR_ERROR, "HERE 2"); + if (gpr_unref(&cd->refcount)) { +gpr_log(GPR_ERROR, "HERE 2.5"); + connection_data_destroy(exec_ctx, cd); + } +gpr_log(GPR_ERROR, "HERE 3"); } static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; - if (error != GRPC_ERROR_NONE) + if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy client write", error); + return; + } + // Clear write buffer. + gpr_slice_buffer_reset_and_unref(&cd->client_write_buffer); } static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; - if (error != GRPC_ERROR_NONE) + if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy server write", error); + return; + } + // Clear write buffer. + gpr_slice_buffer_reset_and_unref(&cd->server_write_buffer); } static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy client read", error); return; } + // Move read data into write buffer and write it. gpr_slice_buffer_move_into(&cd->client_read_buffer, &cd->server_write_buffer); grpc_endpoint_write(exec_ctx, cd->server_endpoint, &cd->server_write_buffer, &cd->on_server_write_done); + // Read more data. + grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, + &cd->on_client_read_done); } static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy server read", error); return; } + // Move read data into write buffer and write it. gpr_slice_buffer_move_into(&cd->server_read_buffer, &cd->client_write_buffer); grpc_endpoint_write(exec_ctx, cd->client_endpoint, &cd->client_write_buffer, &cd->on_client_write_done); + // Read more data. + grpc_endpoint_read(exec_ctx, cd->server_endpoint, &cd->server_read_buffer, + &cd->on_server_read_done); } static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy write response", error); return; } - // Set up proxying. + // Clear write buffer. + gpr_slice_buffer_reset_and_unref(&cd->client_write_buffer); + // Start reading from both client and server. + // We increase the refcount by one, since we already held one reference + // for ourselves, and there will now be two pending callbacks. + gpr_ref(&cd->refcount); grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, &cd->on_client_read_done); grpc_endpoint_read(exec_ctx, cd->server_endpoint, &cd->server_read_buffer, @@ -167,6 +217,7 @@ static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy server connect", error); @@ -174,8 +225,7 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, } // We've established a connection, so send back a 200 response code to // the client. - gpr_slice slice = gpr_slice_from_copied_string("200 connected\r\n"); - gpr_slice_buffer_reset_and_unref(&cd->client_write_buffer); + gpr_slice slice = gpr_slice_from_copied_string("200 connected\r\n\r\n"); gpr_slice_buffer_add(&cd->client_write_buffer, slice); grpc_endpoint_write(exec_ctx, cd->client_endpoint, &cd->client_write_buffer, &cd->on_write_response_done); @@ -183,6 +233,7 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy read request", error); @@ -240,12 +291,14 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* ep, grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor) { // FIXME: remove -gpr_log(GPR_ERROR, "==> on_accept()"); +gpr_log(GPR_ERROR, "==> %s()", __func__); + grpc_end2end_http_proxy* proxy = arg; // Instantiate connection_data. connection_data* cd = gpr_malloc(sizeof(*cd)); memset(cd, 0, sizeof(*cd)); cd->client_endpoint = ep; cd->pollset_set = grpc_pollset_set_create(); + grpc_pollset_set_add_pollset(exec_ctx, cd->pollset_set, proxy->pollset); grpc_closure_init(&cd->on_read_request_done, on_read_request_done, cd); grpc_closure_init(&cd->on_server_connect_done, on_server_connect_done, cd); grpc_closure_init(&cd->on_write_response_done, on_write_response_done, cd); @@ -259,6 +312,8 @@ gpr_log(GPR_ERROR, "==> on_accept()"); gpr_slice_buffer_init(&cd->server_write_buffer); grpc_http_parser_init(&cd->http_parser, GRPC_HTTP_REQUEST, &cd->http_request); + cd->proxy = proxy; + gpr_ref_init(&cd->refcount, 1); grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, &cd->on_read_request_done); } @@ -267,25 +322,6 @@ gpr_log(GPR_ERROR, "==> on_accept()"); // Proxy class // -struct grpc_end2end_http_proxy { - char* proxy_name; -// gpr_thd_id thd; - grpc_tcp_server* server; - grpc_channel_args* channel_args; - gpr_mu* mu; - grpc_pollset* pollset; -}; - -#if 0 -static void thread_main(void *arg) { - //grpc_end2end_http_proxy *proxy = arg; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - while (true) { - grpc_exec_ctx_flush(&exec_ctx); - } -} -#endif - grpc_end2end_http_proxy* grpc_end2end_http_proxy_create() { grpc_end2end_http_proxy* proxy = gpr_malloc(sizeof(*proxy)); memset(proxy, 0, sizeof(*proxy)); @@ -315,7 +351,7 @@ gpr_log(GPR_ERROR, "Proxy address: %s", proxy->proxy_name); grpc_pollset_init(proxy->pollset, &proxy->mu); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp_server_start(&exec_ctx, proxy->server, &proxy->pollset, 1, - on_accept, NULL); + on_accept, proxy); grpc_exec_ctx_finish(&exec_ctx); #if 0 // Start proxy thread. @@ -331,15 +367,21 @@ static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, grpc_pollset_destroy(p); } +// FIXME: remove (including all references below) +//#define USE_THREAD 1 + void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { +gpr_log(GPR_ERROR, "==> %s()", __func__); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp_server_shutdown_listeners(&exec_ctx, proxy->server); grpc_tcp_server_unref(&exec_ctx, proxy->server); -// gpr_thd_join(proxy->thd); +#ifdef USE_THREAD + gpr_thd_join(proxy->thd); +#endif gpr_free(proxy->proxy_name); grpc_channel_args_destroy(proxy->channel_args); grpc_closure destroyed; - grpc_closure_init(&destroyed, destroy_pollset, &proxy->pollset); + grpc_closure_init(&destroyed, destroy_pollset, proxy->pollset); grpc_pollset_shutdown(&exec_ctx, proxy->pollset, &destroyed); gpr_free(proxy); grpc_exec_ctx_finish(&exec_ctx); @@ -349,3 +391,39 @@ const char *grpc_end2end_http_proxy_get_proxy_name( grpc_end2end_http_proxy *proxy) { return proxy->proxy_name; } + +static void thread_main(void* arg) { +gpr_log(GPR_ERROR, "==> %s()", __func__); + grpc_end2end_http_proxy *proxy = arg; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + do { +gpr_log(GPR_ERROR, "HERE a"); + const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + const gpr_timespec deadline = + gpr_time_add(now, gpr_time_from_seconds(5, GPR_TIMESPAN)); + grpc_pollset_worker *worker = NULL; +gpr_log(GPR_ERROR, "HERE b"); + gpr_mu_lock(proxy->mu); +gpr_log(GPR_ERROR, "HERE c"); + GRPC_LOG_IF_ERROR("grpc_pollset_work", + grpc_pollset_work(&exec_ctx, proxy->pollset, &worker, + now, deadline)); +gpr_log(GPR_ERROR, "HERE d"); + gpr_mu_unlock(proxy->mu); +gpr_log(GPR_ERROR, "HERE e"); + grpc_exec_ctx_flush(&exec_ctx); +gpr_log(GPR_ERROR, "HERE f"); + } while (!proxy->shutdown); +gpr_log(GPR_ERROR, "HERE g"); + grpc_exec_ctx_finish(&exec_ctx); +} + +void grpc_end2end_http_proxy_start_thread(grpc_end2end_http_proxy *proxy) { +#ifdef USE_THREAD + gpr_thd_options opt = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&opt); + GPR_ASSERT(gpr_thd_new(&proxy->thd, thread_main, proxy, &opt)); +#else + thread_main(proxy); +#endif +} diff --git a/test/core/end2end/fixtures/http_proxy.h b/test/core/end2end/fixtures/http_proxy.h index 7af2ea92d0..444324e7e0 100644 --- a/test/core/end2end/fixtures/http_proxy.h +++ b/test/core/end2end/fixtures/http_proxy.h @@ -39,3 +39,5 @@ void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy); const char *grpc_end2end_http_proxy_get_proxy_name( grpc_end2end_http_proxy *proxy); + +void grpc_end2end_http_proxy_start_thread(grpc_end2end_http_proxy *proxy); |