diff options
author | Yash Tibrewal <yashkt@google.com> | 2017-12-05 17:49:11 -0800 |
---|---|---|
committer | Yash Tibrewal <yashkt@google.com> | 2017-12-05 17:49:11 -0800 |
commit | 73bb67d054ecb952f10649cc42c998ab7ea8facd (patch) | |
tree | 181ee31adc7a144d8dcffed5e5de990ab4662cef /test/core/end2end/fixtures | |
parent | 65c45fbb4d576d99bcd1c00a13f430c8994fee38 (diff) | |
parent | 05cd3102b7b59bf5d71f66dc012be1f4ecdaad88 (diff) |
Merge master into execctx
Diffstat (limited to 'test/core/end2end/fixtures')
-rw-r--r-- | test/core/end2end/fixtures/h2_sockpair+trace.cc | 8 | ||||
-rw-r--r-- | test/core/end2end/fixtures/h2_sockpair.cc | 8 | ||||
-rw-r--r-- | test/core/end2end/fixtures/h2_sockpair_1byte.cc | 8 | ||||
-rw-r--r-- | test/core/end2end/fixtures/http_proxy_fixture.cc | 104 |
4 files changed, 86 insertions, 42 deletions
diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.cc b/test/core/end2end/fixtures/h2_sockpair+trace.cc index 447bb91d0c..9807e929af 100644 --- a/test/core/end2end/fixtures/h2_sockpair+trace.cc +++ b/test/core/end2end/fixtures/h2_sockpair+trace.cc @@ -93,10 +93,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f, sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = grpc_create_chttp2_transport(client_args, sfd->client, 1); + transport = grpc_create_chttp2_transport(client_args, sfd->client, true); client_setup_transport(&cs, transport); GPR_ASSERT(f->client); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); } static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, @@ -108,9 +108,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, f->server = grpc_server_create(server_args, nullptr); grpc_server_register_completion_queue(f->server, f->cq, nullptr); grpc_server_start(f->server); - transport = grpc_create_chttp2_transport(server_args, sfd->server, 0); + transport = grpc_create_chttp2_transport(server_args, sfd->server, false); server_setup_transport(f, transport); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); } static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) { diff --git a/test/core/end2end/fixtures/h2_sockpair.cc b/test/core/end2end/fixtures/h2_sockpair.cc index c1f8ee0b64..b68279fd71 100644 --- a/test/core/end2end/fixtures/h2_sockpair.cc +++ b/test/core/end2end/fixtures/h2_sockpair.cc @@ -87,10 +87,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f, sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = grpc_create_chttp2_transport(client_args, sfd->client, 1); + transport = grpc_create_chttp2_transport(client_args, sfd->client, true); client_setup_transport(&cs, transport); GPR_ASSERT(f->client); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); } static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, @@ -102,9 +102,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, f->server = grpc_server_create(server_args, nullptr); grpc_server_register_completion_queue(f->server, f->cq, nullptr); grpc_server_start(f->server); - transport = grpc_create_chttp2_transport(server_args, sfd->server, 0); + transport = grpc_create_chttp2_transport(server_args, sfd->server, false); server_setup_transport(f, transport); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); } static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) { diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.cc b/test/core/end2end/fixtures/h2_sockpair_1byte.cc index 55b847bdae..350be138ca 100644 --- a/test/core/end2end/fixtures/h2_sockpair_1byte.cc +++ b/test/core/end2end/fixtures/h2_sockpair_1byte.cc @@ -98,10 +98,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f, sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = grpc_create_chttp2_transport(client_args, sfd->client, 1); + transport = grpc_create_chttp2_transport(client_args, sfd->client, true); client_setup_transport(&cs, transport); GPR_ASSERT(f->client); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); } static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, @@ -113,9 +113,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, f->server = grpc_server_create(server_args, nullptr); grpc_server_register_completion_queue(f->server, f->cq, nullptr); grpc_server_start(f->server); - transport = grpc_create_chttp2_transport(server_args, sfd->server, 0); + transport = grpc_create_chttp2_transport(server_args, sfd->server, false); server_setup_transport(f, transport); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); } static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) { diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index 539bbe4134..15c9dd4847 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -68,6 +68,9 @@ struct grpc_end2end_http_proxy { // Connection handling // +// proxy_connection structure is only accessed in the closures which are all +// scheduled under the same combiner lock. So there is is no need for a mutex to +// protect this structure. typedef struct proxy_connection { grpc_end2end_http_proxy* proxy; @@ -78,6 +81,8 @@ typedef struct proxy_connection { grpc_pollset_set* pollset_set; + // NOTE: All the closures execute under proxy->combiner lock. Which means + // there will not be any data-races between the closures grpc_closure on_read_request_done; grpc_closure on_server_connect_done; grpc_closure on_write_response_done; @@ -86,6 +91,13 @@ typedef struct proxy_connection { grpc_closure on_server_read_done; grpc_closure on_server_write_done; + bool client_read_failed : 1; + bool client_write_failed : 1; + bool client_shutdown : 1; + bool server_read_failed : 1; + bool server_write_failed : 1; + bool server_shutdown : 1; + grpc_slice_buffer client_read_buffer; grpc_slice_buffer client_deferred_write_buffer; bool client_is_writing; @@ -126,18 +138,50 @@ static void proxy_connection_unref(proxy_connection* conn, const char* reason) { } } +enum failure_type { + SETUP_FAILED, // To be used before we start proxying. + CLIENT_READ_FAILED, + CLIENT_WRITE_FAILED, + SERVER_READ_FAILED, + SERVER_WRITE_FAILED, +}; + // Helper function to shut down the proxy connection. -// Does NOT take ownership of a reference to error. -static void proxy_connection_failed(proxy_connection* conn, bool is_client, - const char* prefix, grpc_error* error) { - const char* msg = grpc_error_string(error); - gpr_log(GPR_INFO, "%s: %s", prefix, msg); - - grpc_endpoint_shutdown(conn->client_endpoint, GRPC_ERROR_REF(error)); - if (conn->server_endpoint != nullptr) { +static void proxy_connection_failed(proxy_connection* conn, + failure_type failure, const char* prefix, + grpc_error* error) { + gpr_log(GPR_INFO, "%s: %s", prefix, grpc_error_string(error)); + // Decide whether we should shut down the client and server. + bool shutdown_client = false; + bool shutdown_server = false; + if (failure == SETUP_FAILED) { + shutdown_client = true; + shutdown_server = true; + } else { + if ((failure == CLIENT_READ_FAILED && conn->client_write_failed) || + (failure == CLIENT_WRITE_FAILED && conn->client_read_failed) || + (failure == SERVER_READ_FAILED && !conn->client_is_writing)) { + shutdown_client = true; + } + if ((failure == SERVER_READ_FAILED && conn->server_write_failed) || + (failure == SERVER_WRITE_FAILED && conn->server_read_failed) || + (failure == CLIENT_READ_FAILED && !conn->server_is_writing)) { + shutdown_server = true; + } + } + // If we decided to shut down either one and have not yet done so, do so. + if (shutdown_client && !conn->client_shutdown) { + grpc_endpoint_shutdown(conn->client_endpoint, GRPC_ERROR_REF(error)); + conn->client_shutdown = true; + } + if (shutdown_server && !conn->server_shutdown && + (conn->server_endpoint != nullptr)) { grpc_endpoint_shutdown(conn->server_endpoint, GRPC_ERROR_REF(error)); + conn->server_shutdown = true; } + // Unref the connection. proxy_connection_unref(conn, "conn_failed"); + GRPC_ERROR_UNREF(error); } // Callback for writing proxy data to the client. @@ -145,8 +189,8 @@ static void on_client_write_done(void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; conn->client_is_writing = false; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, true /* is_client */, - "HTTP proxy client write", error); + proxy_connection_failed(conn, CLIENT_WRITE_FAILED, + "HTTP proxy client write", GRPC_ERROR_REF(error)); return; } // Clear write buffer (the data we just wrote). @@ -170,8 +214,8 @@ static void on_server_write_done(void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; conn->server_is_writing = false; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, false /* is_client */, - "HTTP proxy server write", error); + proxy_connection_failed(conn, SERVER_WRITE_FAILED, + "HTTP proxy server write", GRPC_ERROR_REF(error)); return; } // Clear write buffer (the data we just wrote). @@ -195,8 +239,8 @@ static void on_server_write_done(void* arg, grpc_error* error) { static void on_client_read_done(void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, true /* is_client */, - "HTTP proxy client read", error); + proxy_connection_failed(conn, CLIENT_READ_FAILED, "HTTP proxy client read", + GRPC_ERROR_REF(error)); return; } // If there is already a pending write (i.e., server_write_buffer is @@ -226,8 +270,8 @@ static void on_client_read_done(void* arg, grpc_error* error) { static void on_server_read_done(void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, false /* is_client */, - "HTTP proxy server read", error); + proxy_connection_failed(conn, SERVER_READ_FAILED, "HTTP proxy server read", + GRPC_ERROR_REF(error)); return; } // If there is already a pending write (i.e., client_write_buffer is @@ -257,8 +301,8 @@ static void on_write_response_done(void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; conn->client_is_writing = false; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, true /* is_client */, - "HTTP proxy write response", error); + proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy write response", + GRPC_ERROR_REF(error)); return; } // Clear write buffer. @@ -285,8 +329,8 @@ static void on_server_connect_done(void* arg, grpc_error* error) { // connection failed. However, for the purposes of this test code, // it's fine to pretend this is a client-side error, which will // cause the client connection to be dropped. - proxy_connection_failed(conn, true /* is_client */, - "HTTP proxy server connect", error); + proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy server connect", + GRPC_ERROR_REF(error)); return; } // We've established a connection, so send back a 200 response code to @@ -331,8 +375,8 @@ static void on_read_request_done(void* arg, grpc_error* error) { gpr_log(GPR_DEBUG, "on_read_request_done: %p %s", conn, grpc_error_string(error)); if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, true /* is_client */, - "HTTP proxy read request", error); + proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy read request", + GRPC_ERROR_REF(error)); return; } // Read request and feed it to the parser. @@ -341,8 +385,8 @@ static void on_read_request_done(void* arg, grpc_error* error) { error = grpc_http_parser_parse( &conn->http_parser, conn->client_read_buffer.slices[i], nullptr); if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, true /* is_client */, - "HTTP proxy request parse", error); + proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy request parse", + GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); return; } @@ -362,8 +406,8 @@ static void on_read_request_done(void* arg, grpc_error* error) { conn->http_request.method); error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); - proxy_connection_failed(conn, true /* is_client */, - "HTTP proxy read request", error); + proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy read request", + GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); return; } @@ -382,8 +426,8 @@ static void on_read_request_done(void* arg, grpc_error* error) { if (!client_authenticated) { const char* msg = "HTTP Connect could not verify authentication"; error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(msg); - proxy_connection_failed(conn, true /* is_client */, - "HTTP proxy read request", error); + proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy read request", + GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); return; } @@ -393,8 +437,8 @@ static void on_read_request_done(void* arg, grpc_error* error) { error = grpc_blocking_resolve_address(conn->http_request.path, "80", &resolved_addresses); if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, true /* is_client */, "HTTP proxy DNS lookup", - error); + proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy DNS lookup", + GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); return; } |