From 339e421b29dc5a1369367ce6863a76ba8f3eda71 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 10 May 2017 12:52:45 -0700 Subject: Change endpoint interface to declare poller coveredness --- test/core/bad_client/bad_client.c | 5 +++-- test/core/end2end/bad_server_response_test.c | 8 +++++--- test/core/end2end/fixtures/http_proxy_fixture.c | 23 ++++++++++++----------- test/core/iomgr/endpoint_tests.c | 12 ++++++------ test/core/iomgr/tcp_posix_test.c | 11 ++++++----- test/core/security/secure_endpoint_test.c | 2 +- test/core/util/mock_endpoint.c | 6 ++++-- test/core/util/passthru_endpoint.c | 6 ++++-- test/core/util/trickle_endpoint.c | 10 ++++++---- 9 files changed, 47 insertions(+), 36 deletions(-) (limited to 'test/core') diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index 8dbc5aa861..fd143a0c4c 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -153,7 +153,8 @@ void grpc_run_bad_client_test( grpc_schedule_on_exec_ctx); /* Write data */ - grpc_endpoint_write(&exec_ctx, sfd.client, &outgoing, &done_write_closure); + grpc_endpoint_write(&exec_ctx, sfd.client, &outgoing, true, + &done_write_closure); grpc_exec_ctx_finish(&exec_ctx); /* Await completion */ @@ -181,7 +182,7 @@ void grpc_run_bad_client_test( grpc_closure read_done_closure; grpc_closure_init(&read_done_closure, read_done, &args, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(&exec_ctx, sfd.client, &args.incoming, + grpc_endpoint_read(&exec_ctx, sfd.client, &args.incoming, true, &read_done_closure); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT( diff --git a/test/core/end2end/bad_server_response_test.c b/test/core/end2end/bad_server_response_test.c index fe7e674d17..f6897ef8c6 100644 --- a/test/core/end2end/bad_server_response_test.c +++ b/test/core/end2end/bad_server_response_test.c @@ -120,7 +120,8 @@ static void handle_write(grpc_exec_ctx *exec_ctx) { grpc_slice_buffer_reset_and_unref(&state.outgoing_buffer); grpc_slice_buffer_add(&state.outgoing_buffer, slice); - grpc_endpoint_write(exec_ctx, state.tcp, &state.outgoing_buffer, &on_write); + grpc_endpoint_write(exec_ctx, state.tcp, &state.outgoing_buffer, true, + &on_write); } static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -142,7 +143,7 @@ static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD) { handle_write(exec_ctx); } else { - grpc_endpoint_read(exec_ctx, state.tcp, &state.temp_incoming_buffer, + grpc_endpoint_read(exec_ctx, state.tcp, &state.temp_incoming_buffer, true, &on_read); } } @@ -159,7 +160,8 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, state.tcp = tcp; state.incoming_data_length = 0; grpc_endpoint_add_to_pollset(exec_ctx, tcp, server->pollset); - grpc_endpoint_read(exec_ctx, tcp, &state.temp_incoming_buffer, &on_read); + grpc_endpoint_read(exec_ctx, tcp, &state.temp_incoming_buffer, true, + &on_read); } static gpr_timespec n_sec_deadline(int seconds) { diff --git a/test/core/end2end/fixtures/http_proxy_fixture.c b/test/core/end2end/fixtures/http_proxy_fixture.c index f0d09487c6..a095f98c4c 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.c +++ b/test/core/end2end/fixtures/http_proxy_fixture.c @@ -170,7 +170,7 @@ static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_slice_buffer_move_into(&conn->client_deferred_write_buffer, &conn->client_write_buffer); grpc_endpoint_write(exec_ctx, conn->client_endpoint, - &conn->client_write_buffer, + &conn->client_write_buffer, true, &conn->on_client_write_done); } else { // No more writes. Unref the connection. @@ -195,7 +195,7 @@ static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_slice_buffer_move_into(&conn->server_deferred_write_buffer, &conn->server_write_buffer); grpc_endpoint_write(exec_ctx, conn->server_endpoint, - &conn->server_write_buffer, + &conn->server_write_buffer, true, &conn->on_server_write_done); } else { // No more writes. Unref the connection. @@ -227,12 +227,12 @@ static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, &conn->server_write_buffer); proxy_connection_ref(conn, "client_read"); grpc_endpoint_write(exec_ctx, conn->server_endpoint, - &conn->server_write_buffer, + &conn->server_write_buffer, true, &conn->on_server_write_done); } // Read more data. grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, - &conn->on_client_read_done); + true, &conn->on_client_read_done); } // Callback for reading data from the backend server, which will be @@ -259,12 +259,12 @@ static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, &conn->client_write_buffer); proxy_connection_ref(conn, "server_read"); grpc_endpoint_write(exec_ctx, conn->client_endpoint, - &conn->client_write_buffer, + &conn->client_write_buffer, true, &conn->on_client_write_done); } // Read more data. grpc_endpoint_read(exec_ctx, conn->server_endpoint, &conn->server_read_buffer, - &conn->on_server_read_done); + true, &conn->on_server_read_done); } // Callback to write the HTTP response for the CONNECT request. @@ -285,9 +285,9 @@ static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, proxy_connection_ref(conn, "server_read"); proxy_connection_unref(exec_ctx, conn, "write_response"); grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, - &conn->on_client_read_done); + true, &conn->on_client_read_done); grpc_endpoint_read(exec_ctx, conn->server_endpoint, &conn->server_read_buffer, - &conn->on_server_read_done); + true, &conn->on_server_read_done); } // Callback to connect to the backend server specified by the HTTP @@ -312,7 +312,7 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_slice_from_copied_string("HTTP/1.0 200 connected\r\n\r\n"); grpc_slice_buffer_add(&conn->client_write_buffer, slice); grpc_endpoint_write(exec_ctx, conn->client_endpoint, - &conn->client_write_buffer, + &conn->client_write_buffer, true, &conn->on_write_response_done); } @@ -349,7 +349,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, // If we're not done reading the request, read more data. if (conn->http_parser.state != GRPC_HTTP_BODY) { grpc_endpoint_read(exec_ctx, conn->client_endpoint, - &conn->client_read_buffer, &conn->on_read_request_done); + &conn->client_read_buffer, true, + &conn->on_read_request_done); return; } // Make sure we got a CONNECT request. @@ -422,7 +423,7 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_http_parser_init(&conn->http_parser, GRPC_HTTP_REQUEST, &conn->http_request); grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, - &conn->on_read_request_done); + true, &conn->on_read_request_done); } // diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index e274796e23..8ee94f27b0 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -143,7 +143,7 @@ static void read_and_write_test_read_handler(grpc_exec_ctx *exec_ctx, GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)); gpr_mu_unlock(g_mu); } else if (error == GRPC_ERROR_NONE) { - grpc_endpoint_read(exec_ctx, state->read_ep, &state->incoming, + grpc_endpoint_read(exec_ctx, state->read_ep, &state->incoming, true, &state->done_read); } } @@ -165,7 +165,7 @@ static void read_and_write_test_write_handler(grpc_exec_ctx *exec_ctx, &state->current_write_data); grpc_slice_buffer_reset_and_unref(&state->outgoing); grpc_slice_buffer_addn(&state->outgoing, slices, nslices); - grpc_endpoint_write(exec_ctx, state->write_ep, &state->outgoing, + grpc_endpoint_write(exec_ctx, state->write_ep, &state->outgoing, true, &state->done_write); gpr_free(slices); return; @@ -228,7 +228,7 @@ static void read_and_write_test(grpc_endpoint_test_config config, read_and_write_test_write_handler(&exec_ctx, &state, GRPC_ERROR_NONE); grpc_exec_ctx_flush(&exec_ctx); - grpc_endpoint_read(&exec_ctx, state.read_ep, &state.incoming, + grpc_endpoint_read(&exec_ctx, state.read_ep, &state.incoming, true, &state.done_read); if (shutdown) { @@ -296,19 +296,19 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset); - grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, + grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, true, grpc_closure_create(inc_on_failure, &fail_count, grpc_schedule_on_exec_ctx)); wait_for_fail_count(&exec_ctx, &fail_count, 0); grpc_endpoint_shutdown(&exec_ctx, f.client_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown")); wait_for_fail_count(&exec_ctx, &fail_count, 1); - grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, + grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, true, grpc_closure_create(inc_on_failure, &fail_count, grpc_schedule_on_exec_ctx)); wait_for_fail_count(&exec_ctx, &fail_count, 2); grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a")); - grpc_endpoint_write(&exec_ctx, f.client_ep, &slice_buffer, + grpc_endpoint_write(&exec_ctx, f.client_ep, &slice_buffer, true, grpc_closure_create(inc_on_failure, &fail_count, grpc_schedule_on_exec_ctx)); wait_for_fail_count(&exec_ctx, &fail_count, 3); diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 2c53a003d2..f1d1bd409a 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -164,7 +164,8 @@ static void read_cb(grpc_exec_ctx *exec_ctx, void *user_data, if (state->read_bytes >= state->target_read_bytes) { gpr_mu_unlock(g_mu); } else { - grpc_endpoint_read(exec_ctx, state->ep, &state->incoming, &state->read_cb); + grpc_endpoint_read(exec_ctx, state->ep, &state->incoming, true, + &state->read_cb); gpr_mu_unlock(g_mu); } } @@ -200,7 +201,7 @@ static void read_test(size_t num_bytes, size_t slice_size) { grpc_slice_buffer_init(&state.incoming); grpc_closure_init(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(&exec_ctx, ep, &state.incoming, true, &state.read_cb); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { @@ -252,7 +253,7 @@ static void large_read_test(size_t slice_size) { grpc_slice_buffer_init(&state.incoming); grpc_closure_init(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(&exec_ctx, ep, &state.incoming, true, &state.read_cb); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { @@ -393,7 +394,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { grpc_closure_init(&write_done_closure, write_done, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_write(&exec_ctx, ep, &outgoing, &write_done_closure); + grpc_endpoint_write(&exec_ctx, ep, &outgoing, true, &write_done_closure); drain_socket_blocking(sv[0], num_bytes, num_bytes); gpr_mu_lock(g_mu); for (;;) { @@ -463,7 +464,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { grpc_slice_buffer_init(&state.incoming); grpc_closure_init(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(&exec_ctx, ep, &state.incoming, true, &state.read_cb); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index 71d8057ac3..59c99b81f9 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -162,7 +162,7 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) { grpc_slice_buffer_init(&incoming); grpc_closure_init(&done_closure, inc_call_ctr, &n, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(&exec_ctx, f.client_ep, &incoming, &done_closure); + grpc_endpoint_read(&exec_ctx, f.client_ep, &incoming, true, &done_closure); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(n == 1); GPR_ASSERT(incoming.count == 1); diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index c747297984..d55e82cb5d 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -56,7 +56,8 @@ typedef struct grpc_mock_endpoint { } grpc_mock_endpoint; static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; gpr_mu_lock(&m->mu); if (m->read_buffer.count > 0) { @@ -70,7 +71,8 @@ static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } static void me_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; for (size_t i = 0; i < slices->count; i++) { m->on_write(slices->slices[i]); diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index 6400845d23..bce0924e1b 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -71,7 +71,8 @@ struct passthru_endpoint { }; static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { half *m = (half *)ep; gpr_mu_lock(&m->parent->mu); if (m->parent->shutdown) { @@ -93,7 +94,8 @@ static half *other_half(half *h) { } static void me_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { half *m = other_half((half *)ep); gpr_mu_lock(&m->parent->mu); grpc_error *error = GRPC_ERROR_NONE; diff --git a/test/core/util/trickle_endpoint.c b/test/core/util/trickle_endpoint.c index 69386a0718..a8b6c5b52c 100644 --- a/test/core/util/trickle_endpoint.c +++ b/test/core/util/trickle_endpoint.c @@ -61,9 +61,10 @@ typedef struct { } trickle_endpoint; static void te_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { trickle_endpoint *te = (trickle_endpoint *)ep; - grpc_endpoint_read(exec_ctx, te->wrapped, slices, cb); + grpc_endpoint_read(exec_ctx, te->wrapped, slices, covered_by_poller, cb); } static void maybe_call_write_cb_locked(grpc_exec_ctx *exec_ctx, @@ -76,7 +77,8 @@ static void maybe_call_write_cb_locked(grpc_exec_ctx *exec_ctx, } static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { trickle_endpoint *te = (trickle_endpoint *)ep; gpr_mu_lock(&te->mu); GPR_ASSERT(te->write_cb == NULL); @@ -201,7 +203,7 @@ size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx, te->writing = true; te->last_write = now; grpc_endpoint_write( - exec_ctx, te->wrapped, &te->writing_buffer, + exec_ctx, te->wrapped, &te->writing_buffer, true, grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx)); maybe_call_write_cb_locked(exec_ctx, te); } -- cgit v1.2.3