From 339e421b29dc5a1369367ce6863a76ba8f3eda71 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 10 May 2017 12:52:45 -0700 Subject: Change endpoint interface to declare poller coveredness --- test/core/bad_client/bad_client.c | 5 +++-- test/core/end2end/bad_server_response_test.c | 8 +++++--- test/core/end2end/fixtures/http_proxy_fixture.c | 23 ++++++++++++----------- test/core/iomgr/endpoint_tests.c | 12 ++++++------ test/core/iomgr/tcp_posix_test.c | 11 ++++++----- test/core/security/secure_endpoint_test.c | 2 +- test/core/util/mock_endpoint.c | 6 ++++-- test/core/util/passthru_endpoint.c | 6 ++++-- test/core/util/trickle_endpoint.c | 10 ++++++---- 9 files changed, 47 insertions(+), 36 deletions(-) (limited to 'test') diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index 8dbc5aa861..fd143a0c4c 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -153,7 +153,8 @@ void grpc_run_bad_client_test( grpc_schedule_on_exec_ctx); /* Write data */ - grpc_endpoint_write(&exec_ctx, sfd.client, &outgoing, &done_write_closure); + grpc_endpoint_write(&exec_ctx, sfd.client, &outgoing, true, + &done_write_closure); grpc_exec_ctx_finish(&exec_ctx); /* Await completion */ @@ -181,7 +182,7 @@ void grpc_run_bad_client_test( grpc_closure read_done_closure; grpc_closure_init(&read_done_closure, read_done, &args, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(&exec_ctx, sfd.client, &args.incoming, + grpc_endpoint_read(&exec_ctx, sfd.client, &args.incoming, true, &read_done_closure); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT( diff --git a/test/core/end2end/bad_server_response_test.c b/test/core/end2end/bad_server_response_test.c index fe7e674d17..f6897ef8c6 100644 --- a/test/core/end2end/bad_server_response_test.c +++ b/test/core/end2end/bad_server_response_test.c @@ -120,7 +120,8 @@ static void handle_write(grpc_exec_ctx *exec_ctx) { grpc_slice_buffer_reset_and_unref(&state.outgoing_buffer); grpc_slice_buffer_add(&state.outgoing_buffer, slice); - grpc_endpoint_write(exec_ctx, state.tcp, &state.outgoing_buffer, &on_write); + grpc_endpoint_write(exec_ctx, state.tcp, &state.outgoing_buffer, true, + &on_write); } static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -142,7 +143,7 @@ static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD) { handle_write(exec_ctx); } else { - grpc_endpoint_read(exec_ctx, state.tcp, &state.temp_incoming_buffer, + grpc_endpoint_read(exec_ctx, state.tcp, &state.temp_incoming_buffer, true, &on_read); } } @@ -159,7 +160,8 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, state.tcp = tcp; state.incoming_data_length = 0; grpc_endpoint_add_to_pollset(exec_ctx, tcp, server->pollset); - grpc_endpoint_read(exec_ctx, tcp, &state.temp_incoming_buffer, &on_read); + grpc_endpoint_read(exec_ctx, tcp, &state.temp_incoming_buffer, true, + &on_read); } static gpr_timespec n_sec_deadline(int seconds) { diff --git a/test/core/end2end/fixtures/http_proxy_fixture.c b/test/core/end2end/fixtures/http_proxy_fixture.c index f0d09487c6..a095f98c4c 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.c +++ b/test/core/end2end/fixtures/http_proxy_fixture.c @@ -170,7 +170,7 @@ static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_slice_buffer_move_into(&conn->client_deferred_write_buffer, &conn->client_write_buffer); grpc_endpoint_write(exec_ctx, conn->client_endpoint, - &conn->client_write_buffer, + &conn->client_write_buffer, true, &conn->on_client_write_done); } else { // No more writes. Unref the connection. @@ -195,7 +195,7 @@ static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_slice_buffer_move_into(&conn->server_deferred_write_buffer, &conn->server_write_buffer); grpc_endpoint_write(exec_ctx, conn->server_endpoint, - &conn->server_write_buffer, + &conn->server_write_buffer, true, &conn->on_server_write_done); } else { // No more writes. Unref the connection. @@ -227,12 +227,12 @@ static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, &conn->server_write_buffer); proxy_connection_ref(conn, "client_read"); grpc_endpoint_write(exec_ctx, conn->server_endpoint, - &conn->server_write_buffer, + &conn->server_write_buffer, true, &conn->on_server_write_done); } // Read more data. grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, - &conn->on_client_read_done); + true, &conn->on_client_read_done); } // Callback for reading data from the backend server, which will be @@ -259,12 +259,12 @@ static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, &conn->client_write_buffer); proxy_connection_ref(conn, "server_read"); grpc_endpoint_write(exec_ctx, conn->client_endpoint, - &conn->client_write_buffer, + &conn->client_write_buffer, true, &conn->on_client_write_done); } // Read more data. grpc_endpoint_read(exec_ctx, conn->server_endpoint, &conn->server_read_buffer, - &conn->on_server_read_done); + true, &conn->on_server_read_done); } // Callback to write the HTTP response for the CONNECT request. @@ -285,9 +285,9 @@ static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, proxy_connection_ref(conn, "server_read"); proxy_connection_unref(exec_ctx, conn, "write_response"); grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, - &conn->on_client_read_done); + true, &conn->on_client_read_done); grpc_endpoint_read(exec_ctx, conn->server_endpoint, &conn->server_read_buffer, - &conn->on_server_read_done); + true, &conn->on_server_read_done); } // Callback to connect to the backend server specified by the HTTP @@ -312,7 +312,7 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_slice_from_copied_string("HTTP/1.0 200 connected\r\n\r\n"); grpc_slice_buffer_add(&conn->client_write_buffer, slice); grpc_endpoint_write(exec_ctx, conn->client_endpoint, - &conn->client_write_buffer, + &conn->client_write_buffer, true, &conn->on_write_response_done); } @@ -349,7 +349,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, // If we're not done reading the request, read more data. if (conn->http_parser.state != GRPC_HTTP_BODY) { grpc_endpoint_read(exec_ctx, conn->client_endpoint, - &conn->client_read_buffer, &conn->on_read_request_done); + &conn->client_read_buffer, true, + &conn->on_read_request_done); return; } // Make sure we got a CONNECT request. @@ -422,7 +423,7 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_http_parser_init(&conn->http_parser, GRPC_HTTP_REQUEST, &conn->http_request); grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, - &conn->on_read_request_done); + true, &conn->on_read_request_done); } // diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index e274796e23..8ee94f27b0 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -143,7 +143,7 @@ static void read_and_write_test_read_handler(grpc_exec_ctx *exec_ctx, GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)); gpr_mu_unlock(g_mu); } else if (error == GRPC_ERROR_NONE) { - grpc_endpoint_read(exec_ctx, state->read_ep, &state->incoming, + grpc_endpoint_read(exec_ctx, state->read_ep, &state->incoming, true, &state->done_read); } } @@ -165,7 +165,7 @@ static void read_and_write_test_write_handler(grpc_exec_ctx *exec_ctx, &state->current_write_data); grpc_slice_buffer_reset_and_unref(&state->outgoing); grpc_slice_buffer_addn(&state->outgoing, slices, nslices); - grpc_endpoint_write(exec_ctx, state->write_ep, &state->outgoing, + grpc_endpoint_write(exec_ctx, state->write_ep, &state->outgoing, true, &state->done_write); gpr_free(slices); return; @@ -228,7 +228,7 @@ static void read_and_write_test(grpc_endpoint_test_config config, read_and_write_test_write_handler(&exec_ctx, &state, GRPC_ERROR_NONE); grpc_exec_ctx_flush(&exec_ctx); - grpc_endpoint_read(&exec_ctx, state.read_ep, &state.incoming, + grpc_endpoint_read(&exec_ctx, state.read_ep, &state.incoming, true, &state.done_read); if (shutdown) { @@ -296,19 +296,19 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset); - grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, + grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, true, grpc_closure_create(inc_on_failure, &fail_count, grpc_schedule_on_exec_ctx)); wait_for_fail_count(&exec_ctx, &fail_count, 0); grpc_endpoint_shutdown(&exec_ctx, f.client_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown")); wait_for_fail_count(&exec_ctx, &fail_count, 1); - grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, + grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, true, grpc_closure_create(inc_on_failure, &fail_count, grpc_schedule_on_exec_ctx)); wait_for_fail_count(&exec_ctx, &fail_count, 2); grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a")); - grpc_endpoint_write(&exec_ctx, f.client_ep, &slice_buffer, + grpc_endpoint_write(&exec_ctx, f.client_ep, &slice_buffer, true, grpc_closure_create(inc_on_failure, &fail_count, grpc_schedule_on_exec_ctx)); wait_for_fail_count(&exec_ctx, &fail_count, 3); diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 2c53a003d2..f1d1bd409a 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -164,7 +164,8 @@ static void read_cb(grpc_exec_ctx *exec_ctx, void *user_data, if (state->read_bytes >= state->target_read_bytes) { gpr_mu_unlock(g_mu); } else { - grpc_endpoint_read(exec_ctx, state->ep, &state->incoming, &state->read_cb); + grpc_endpoint_read(exec_ctx, state->ep, &state->incoming, true, + &state->read_cb); gpr_mu_unlock(g_mu); } } @@ -200,7 +201,7 @@ static void read_test(size_t num_bytes, size_t slice_size) { grpc_slice_buffer_init(&state.incoming); grpc_closure_init(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(&exec_ctx, ep, &state.incoming, true, &state.read_cb); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { @@ -252,7 +253,7 @@ static void large_read_test(size_t slice_size) { grpc_slice_buffer_init(&state.incoming); grpc_closure_init(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(&exec_ctx, ep, &state.incoming, true, &state.read_cb); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { @@ -393,7 +394,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { grpc_closure_init(&write_done_closure, write_done, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_write(&exec_ctx, ep, &outgoing, &write_done_closure); + grpc_endpoint_write(&exec_ctx, ep, &outgoing, true, &write_done_closure); drain_socket_blocking(sv[0], num_bytes, num_bytes); gpr_mu_lock(g_mu); for (;;) { @@ -463,7 +464,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { grpc_slice_buffer_init(&state.incoming); grpc_closure_init(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(&exec_ctx, ep, &state.incoming, true, &state.read_cb); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index 71d8057ac3..59c99b81f9 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -162,7 +162,7 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) { grpc_slice_buffer_init(&incoming); grpc_closure_init(&done_closure, inc_call_ctr, &n, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(&exec_ctx, f.client_ep, &incoming, &done_closure); + grpc_endpoint_read(&exec_ctx, f.client_ep, &incoming, true, &done_closure); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(n == 1); GPR_ASSERT(incoming.count == 1); diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index c747297984..d55e82cb5d 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -56,7 +56,8 @@ typedef struct grpc_mock_endpoint { } grpc_mock_endpoint; static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; gpr_mu_lock(&m->mu); if (m->read_buffer.count > 0) { @@ -70,7 +71,8 @@ static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } static void me_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; for (size_t i = 0; i < slices->count; i++) { m->on_write(slices->slices[i]); diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index 6400845d23..bce0924e1b 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -71,7 +71,8 @@ struct passthru_endpoint { }; static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { half *m = (half *)ep; gpr_mu_lock(&m->parent->mu); if (m->parent->shutdown) { @@ -93,7 +94,8 @@ static half *other_half(half *h) { } static void me_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { half *m = other_half((half *)ep); gpr_mu_lock(&m->parent->mu); grpc_error *error = GRPC_ERROR_NONE; diff --git a/test/core/util/trickle_endpoint.c b/test/core/util/trickle_endpoint.c index 69386a0718..a8b6c5b52c 100644 --- a/test/core/util/trickle_endpoint.c +++ b/test/core/util/trickle_endpoint.c @@ -61,9 +61,10 @@ typedef struct { } trickle_endpoint; static void te_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { trickle_endpoint *te = (trickle_endpoint *)ep; - grpc_endpoint_read(exec_ctx, te->wrapped, slices, cb); + grpc_endpoint_read(exec_ctx, te->wrapped, slices, covered_by_poller, cb); } static void maybe_call_write_cb_locked(grpc_exec_ctx *exec_ctx, @@ -76,7 +77,8 @@ static void maybe_call_write_cb_locked(grpc_exec_ctx *exec_ctx, } static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { trickle_endpoint *te = (trickle_endpoint *)ep; gpr_mu_lock(&te->mu); GPR_ASSERT(te->write_cb == NULL); @@ -201,7 +203,7 @@ size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx, te->writing = true; te->last_write = now; grpc_endpoint_write( - exec_ctx, te->wrapped, &te->writing_buffer, + exec_ctx, te->wrapped, &te->writing_buffer, true, grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx)); maybe_call_write_cb_locked(exec_ctx, te); } -- cgit v1.2.3 From 31af436d969ac2b9f32eef6a17b2b02c877691e1 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 14 Jul 2017 15:48:17 -0700 Subject: Fixes for async_end2end_test: Shutdown() can block --- test/cpp/end2end/async_end2end_test.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'test') diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 7b78071217..2a0adc9068 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -278,6 +278,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam { } void TearDown() override { + gpr_tls_set(&g_is_async_end2end_test, 0); server_->Shutdown(); void* ignored_tag; bool ignored_ok; @@ -285,7 +286,6 @@ class AsyncEnd2endTest : public ::testing::TestWithParam { while (cq_->Next(&ignored_tag, &ignored_ok)) ; poll_overrider_.reset(); - gpr_tls_set(&g_is_async_end2end_test, 0); grpc_recycle_unused_port(port_); } @@ -365,6 +365,7 @@ TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) { ResetStub(); SendRpc(1); EXPECT_EQ(0, notify); + gpr_tls_set(&g_is_async_end2end_test, 0); server_->Shutdown(); wait_thread.join(); EXPECT_EQ(1, notify); @@ -373,8 +374,9 @@ TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) { TEST_P(AsyncEnd2endTest, ShutdownThenWait) { ResetStub(); SendRpc(1); - server_->Shutdown(); + std::thread t([this]() { server_->Shutdown(); }); server_->Wait(); + t.join(); } // Test a simple RPC using the async version of Next -- cgit v1.2.3 From af723b0424df0047f965978d8f4bacee2af917c3 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 17 Jul 2017 17:56:28 -0700 Subject: Debug aids --- .../transport/chttp2/transport/chttp2_transport.c | 50 +++++++++++++++------- src/core/lib/iomgr/executor.c | 32 ++++++++++++++ src/core/lib/surface/call.c | 3 ++ test/core/end2end/tests/resource_quota_server.c | 8 ++-- 4 files changed, 74 insertions(+), 19 deletions(-) (limited to 'test') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 9ff70e33b6..63bc786f1d 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -886,6 +886,21 @@ static grpc_closure_scheduler *write_scheduler(grpc_chttp2_transport *t, GPR_UNREACHABLE_CODE(return NULL); } +#define WRITE_STATE_TUPLE_TO_INT(p, i) (2 * (int)(p) + (int)(i)) +static const char *begin_writing_desc(bool partial, bool inlined) { + switch (WRITE_STATE_TUPLE_TO_INT(partial, inlined)) { + case WRITE_STATE_TUPLE_TO_INT(false, false): + return "begin write in background"; + case WRITE_STATE_TUPLE_TO_INT(false, true): + return "begin write in current thread"; + case WRITE_STATE_TUPLE_TO_INT(true, false): + return "begin partial write in background"; + case WRITE_STATE_TUPLE_TO_INT(true, true): + return "begin partial write in current thread"; + } + GPR_UNREACHABLE_CODE(return "bad state tuple"); +} + static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error_ignored) { GPR_TIMER_BEGIN("write_action_begin_locked", 0); @@ -898,15 +913,17 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, r = grpc_chttp2_begin_write(exec_ctx, t); } if (r.writing) { - set_write_state(exec_ctx, t, - r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE - : GRPC_CHTTP2_WRITE_STATE_WRITING, - r.partial ? "begin writing partial" : "begin writing"); - GRPC_CLOSURE_SCHED( - exec_ctx, - GRPC_CLOSURE_INIT(&t->write_action, write_action, t, - write_scheduler(t, r.early_results_scheduled)), - GRPC_ERROR_NONE); + grpc_closure_scheduler *scheduler = + write_scheduler(t, r.early_results_scheduled); + set_write_state( + exec_ctx, t, r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE + : GRPC_CHTTP2_WRITE_STATE_WRITING, + begin_writing_desc(r.partial, scheduler == grpc_schedule_on_exec_ctx)); + GPR_ASSERT(scheduler == grpc_schedule_on_exec_ctx || + scheduler == grpc_executor_scheduler); + GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_INIT(&t->write_action, + write_action, t, scheduler), + GRPC_ERROR_NONE); } else { set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); @@ -918,6 +935,7 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; GPR_TIMER_BEGIN("write_action", 0); + gpr_log(GPR_DEBUG, "W:%p write_action", t); grpc_endpoint_write( exec_ctx, t->ep, &t->outbuf, GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t, @@ -1104,12 +1122,14 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT; if (GRPC_TRACER_ON(grpc_http_trace)) { const char *errstr = grpc_error_string(error); - gpr_log(GPR_DEBUG, - "complete_closure_step: %p refs=%d flags=0x%04x desc=%s err=%s", - closure, - (int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT), - (int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT), - desc, errstr); + gpr_log( + GPR_DEBUG, + "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s " + "write_state=%s", + t, closure, + (int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT), + (int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT), desc, + errstr, write_state_name(t->write_state)); } if (error != GRPC_ERROR_NONE) { if (closure->error_data.error == GRPC_ERROR_NONE) { diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index 7621a7fe75..757bb6fe1b 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -49,6 +49,9 @@ static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER; GPR_TLS_DECL(g_this_thread_state); +static grpc_tracer_flag executor_trace = + GRPC_TRACER_INITIALIZER(false, "executor"); + static void executor_thread(void *arg); static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { @@ -58,6 +61,14 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { while (c != NULL) { grpc_closure *next = c->next_data.next; grpc_error *error = c->error_data.error; + if (GRPC_TRACER_ON(executor_trace)) { +#ifndef NDEBUG + gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c, + c->file_created, c->line_created); +#else + gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c); +#endif + } #ifndef NDEBUG c->scheduled = false; #endif @@ -119,6 +130,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) { } void grpc_executor_init(grpc_exec_ctx *exec_ctx) { + grpc_register_tracer(&executor_trace); gpr_atm_no_barrier_store(&g_cur_threads, 0); grpc_executor_set_threading(exec_ctx, true); } @@ -136,18 +148,31 @@ static void executor_thread(void *arg) { size_t subtract_depth = 0; for (;;) { + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, + "EXECUTOR[%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")", + ts - g_thread_state, subtract_depth); + } gpr_mu_lock(&ts->mu); ts->depth -= subtract_depth; while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } if (ts->shutdown) { + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, "EXECUTOR[%" PRIdPTR "]: shutdown", + ts - g_thread_state); + } gpr_mu_unlock(&ts->mu); break; } grpc_closure_list exec = ts->elems; ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, "EXECUTOR[%" PRIdPTR "]: execute", + ts - g_thread_state); + } subtract_depth = run_closures(&exec_ctx, exec); grpc_exec_ctx_flush(&exec_ctx); @@ -159,6 +184,9 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error) { size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); if (cur_thread_count == 0) { + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p inline", closure); + } grpc_closure_list_append(&exec_ctx->closure_list, closure, error); return; } @@ -166,6 +194,10 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, if (ts == NULL) { ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; } + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p to thread %" PRIdPTR, closure, + ts - g_thread_state); + } gpr_mu_lock(&ts->mu); if (grpc_closure_list_empty(ts->elems)) { gpr_cv_signal(&ts->cv); diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 2365d27307..0ab73c4fb4 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1172,6 +1172,9 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, } static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) { + gpr_log(GPR_DEBUG, "finish_batch_step: tag=%p steps=%" PRIdPTR, + bctl->completion_data.notify_tag.tag, + gpr_atm_no_barrier_load(&bctl->steps_to_complete.count)); if (gpr_unref(&bctl->steps_to_complete)) { post_batch_completion(exec_ctx, bctl); } diff --git a/test/core/end2end/tests/resource_quota_server.c b/test/core/end2end/tests/resource_quota_server.c index 010e20c4c2..9322b26793 100644 --- a/test/core/end2end/tests/resource_quota_server.c +++ b/test/core/end2end/tests/resource_quota_server.c @@ -111,10 +111,10 @@ void resource_quota_server(grpc_end2end_test_config config) { grpc_resource_quota_resize(resource_quota, 5 * 1024 * 1024); #define NUM_CALLS 100 -#define CLIENT_BASE_TAG 1000 -#define SERVER_START_BASE_TAG 2000 -#define SERVER_RECV_BASE_TAG 3000 -#define SERVER_END_BASE_TAG 4000 +#define CLIENT_BASE_TAG 0x1000 +#define SERVER_START_BASE_TAG 0x2000 +#define SERVER_RECV_BASE_TAG 0x3000 +#define SERVER_END_BASE_TAG 0x4000 grpc_arg arg; arg.key = GRPC_ARG_RESOURCE_QUOTA; -- cgit v1.2.3 From cbb384007ed9f0c5409c7f7eb99187d018c87b4c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 21 Jul 2017 09:52:25 -0700 Subject: Fix bm_chttp2_transport --- test/cpp/microbenchmarks/bm_chttp2_transport.cc | 121 +++++++++++++++--------- 1 file changed, 75 insertions(+), 46 deletions(-) (limited to 'test') diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 567ef1cf24..4f80ff68f3 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -29,6 +29,7 @@ extern "C" { #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/ext/transport/chttp2/transport/internal.h" +#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/static_metadata.h" @@ -154,23 +155,59 @@ class Fixture { grpc_transport *t_; }; -static void DoNothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} +class Closure : public grpc_closure { + public: + virtual ~Closure() {} +}; + +template +std::unique_ptr MakeClosure( + F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { + struct C : public Closure { + C(const F &f, grpc_closure_scheduler *sched) : f_(f) { + GRPC_CLOSURE_INIT(this, Execute, this, sched); + } + F f_; + static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + static_cast(arg)->f_(exec_ctx, error); + } + }; + return std::unique_ptr(new C(f, sched)); +} + +template +grpc_closure *MakeOnceClosure( + F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { + struct C : public grpc_closure { + C(const F &f) : f_(f) {} + F f_; + static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + static_cast(arg)->f_(exec_ctx, error); + delete static_cast(arg); + } + }; + auto *c = new C{f}; + return GRPC_CLOSURE_INIT(c, C::Execute, c, sched); +} class Stream { public: Stream(Fixture *f) : f_(f) { - GRPC_STREAM_REF_INIT(&refcount_, 1, DoNothing, nullptr, "test_stream"); stream_size_ = grpc_transport_stream_size(f->transport()); stream_ = gpr_malloc(stream_size_); arena_ = gpr_arena_create(4096); } ~Stream() { + gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME)); gpr_free(stream_); gpr_arena_destroy(arena_); } void Init(benchmark::State &state) { + GRPC_STREAM_REF_INIT(&refcount_, 1, &Stream::FinishDestroy, this, + "test_stream"); + gpr_event_init(&done_); memset(stream_, 0, stream_size_); if ((state.iterations() & 0xffff) == 0) { gpr_arena_destroy(arena_); @@ -182,8 +219,12 @@ class Stream { } void DestroyThen(grpc_closure *closure) { - grpc_transport_destroy_stream(f_->exec_ctx(), f_->transport(), - static_cast(stream_), closure); + destroy_closure_ = closure; +#ifndef NDEBUG + grpc_stream_unref(f_->exec_ctx(), &refcount_, "DestroyThen"); +#else + grpc_stream_unref(f_->exec_ctx(), &refcount_); +#endif } void Op(grpc_transport_stream_op_batch *op) { @@ -196,48 +237,24 @@ class Stream { } private: + static void FinishDestroy(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + auto stream = static_cast(arg); + grpc_transport_destroy_stream(exec_ctx, stream->f_->transport(), + static_cast(stream->stream_), + stream->destroy_closure_); + gpr_event_set(&stream->done_, (void *)1); + } + Fixture *f_; grpc_stream_refcount refcount_; gpr_arena *arena_; size_t stream_size_; void *stream_; + grpc_closure *destroy_closure_ = nullptr; + gpr_event done_; }; -class Closure : public grpc_closure { - public: - virtual ~Closure() {} -}; - -template -std::unique_ptr MakeClosure( - F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { - struct C : public Closure { - C(const F &f, grpc_closure_scheduler *sched) : f_(f) { - GRPC_CLOSURE_INIT(this, Execute, this, sched); - } - F f_; - static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - static_cast(arg)->f_(exec_ctx, error); - } - }; - return std::unique_ptr(new C(f, sched)); -} - -template -grpc_closure *MakeOnceClosure( - F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { - struct C : public grpc_closure { - C(const F &f) : f_(f) {} - F f_; - static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - static_cast(arg)->f_(exec_ctx, error); - delete static_cast(arg); - } - }; - auto *c = new C{f}; - return GRPC_CLOSURE_INIT(c, C::Execute, c, sched); -} - //////////////////////////////////////////////////////////////////////////////// // Benchmarks // @@ -246,10 +263,17 @@ static void BM_StreamCreateDestroy(benchmark::State &state) { TrackCounters track_counters; Fixture f(grpc::ChannelArguments(), true); Stream s(&f); + grpc_transport_stream_op_batch op; + grpc_transport_stream_op_batch_payload op_payload; + memset(&op, 0, sizeof(op)); + op.cancel_stream = true; + op.payload = &op_payload; + op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; std::unique_ptr next = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { if (!state.KeepRunning()) return; s.Init(state); + s.Op(&op); s.DestroyThen(next.get()); }); GRPC_CLOSURE_RUN(f.exec_ctx(), next.get(), GRPC_ERROR_NONE); @@ -350,6 +374,10 @@ static void BM_TransportEmptyOp(benchmark::State &state) { }); GRPC_CLOSURE_SCHED(f.exec_ctx(), c.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); + reset_op(); + op.cancel_stream = true; + op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; + s.Op(&op); s.DestroyThen( MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); f.FlushExecCtx(); @@ -360,8 +388,8 @@ BENCHMARK(BM_TransportEmptyOp); static void BM_TransportStreamSend(benchmark::State &state) { TrackCounters track_counters; Fixture f(grpc::ChannelArguments(), true); - Stream s(&f); - s.Init(state); + auto s = std::unique_ptr(new Stream(&f)); + s->Init(state); grpc_transport_stream_op_batch op; grpc_transport_stream_op_batch_payload op_payload; auto reset_op = [&]() { @@ -391,30 +419,31 @@ static void BM_TransportStreamSend(benchmark::State &state) { MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { if (!state.KeepRunning()) return; // force outgoing window to be yuge - s.chttp2_stream()->outgoing_window_delta = 1024 * 1024 * 1024; + s->chttp2_stream()->outgoing_window_delta = 1024 * 1024 * 1024; f.chttp2_transport()->outgoing_window = 1024 * 1024 * 1024; grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0); reset_op(); op.on_complete = c.get(); op.send_message = true; op.payload->send_message.send_message = &send_stream.base; - s.Op(&op); + s->Op(&op); }); reset_op(); op.send_initial_metadata = true; op.payload->send_initial_metadata.send_initial_metadata = &b; op.on_complete = c.get(); - s.Op(&op); + s->Op(&op); f.FlushExecCtx(); reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen( + s->Op(&op); + s->DestroyThen( MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); f.FlushExecCtx(); + s.reset(); track_counters.Finish(state); grpc_metadata_batch_destroy(f.exec_ctx(), &b); grpc_slice_buffer_destroy(&send_buffer); -- cgit v1.2.3 From 4c0ac4a9f547241a093355bc97c452be45a8270c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 21 Jul 2017 10:38:18 -0700 Subject: Fix fix fix chttp2_transport --- test/cpp/microbenchmarks/bm_chttp2_transport.cc | 61 +++++++++++++++---------- 1 file changed, 36 insertions(+), 25 deletions(-) (limited to 'test') diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 4f80ff68f3..290e67364e 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -218,17 +218,17 @@ class Stream { NULL, arena_); } - void DestroyThen(grpc_closure *closure) { + void DestroyThen(grpc_exec_ctx *exec_ctx, grpc_closure *closure) { destroy_closure_ = closure; #ifndef NDEBUG - grpc_stream_unref(f_->exec_ctx(), &refcount_, "DestroyThen"); + grpc_stream_unref(exec_ctx, &refcount_, "DestroyThen"); #else - grpc_stream_unref(f_->exec_ctx(), &refcount_); + grpc_stream_unref(exec_ctx, &refcount_); #endif } - void Op(grpc_transport_stream_op_batch *op) { - grpc_transport_perform_stream_op(f_->exec_ctx(), f_->transport(), + void Op(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op) { + grpc_transport_perform_stream_op(exec_ctx, f_->transport(), static_cast(stream_), op); } @@ -273,8 +273,8 @@ static void BM_StreamCreateDestroy(benchmark::State &state) { MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { if (!state.KeepRunning()) return; s.Init(state); - s.Op(&op); - s.DestroyThen(next.get()); + s.Op(exec_ctx, &op); + s.DestroyThen(exec_ctx, next.get()); }); GRPC_CLOSURE_RUN(f.exec_ctx(), next.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); @@ -337,14 +337,14 @@ static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State &state) { op.on_complete = done.get(); op.send_initial_metadata = true; op.payload->send_initial_metadata.send_initial_metadata = &b; - s.Op(&op); + s.Op(exec_ctx, &op); }); done = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen(start.get()); + s.Op(exec_ctx, &op); + s.DestroyThen(exec_ctx, start.get()); }); GRPC_CLOSURE_SCHED(f.exec_ctx(), start.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); @@ -370,21 +370,23 @@ static void BM_TransportEmptyOp(benchmark::State &state) { if (!state.KeepRunning()) return; reset_op(); op.on_complete = c.get(); - s.Op(&op); + s.Op(exec_ctx, &op); }); GRPC_CLOSURE_SCHED(f.exec_ctx(), c.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); reset_op(); op.cancel_stream = true; op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen( - MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + s.Op(f.exec_ctx(), &op); + s.DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx, + grpc_error *error) {})); f.FlushExecCtx(); track_counters.Finish(state); } BENCHMARK(BM_TransportEmptyOp); +std::vector> done_events; + static void BM_TransportStreamSend(benchmark::State &state) { TrackCounters track_counters; Fixture f(grpc::ChannelArguments(), true); @@ -415,9 +417,15 @@ static void BM_TransportStreamSend(benchmark::State &state) { grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i]))); } + gpr_event *bm_done = new gpr_event; + gpr_event_init(bm_done); + std::unique_ptr c = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { - if (!state.KeepRunning()) return; + if (!state.KeepRunning()) { + gpr_event_set(bm_done, (void *)1); + return; + } // force outgoing window to be yuge s->chttp2_stream()->outgoing_window_delta = 1024 * 1024 * 1024; f.chttp2_transport()->outgoing_window = 1024 * 1024 * 1024; @@ -426,22 +434,25 @@ static void BM_TransportStreamSend(benchmark::State &state) { op.on_complete = c.get(); op.send_message = true; op.payload->send_message.send_message = &send_stream.base; - s->Op(&op); + s->Op(exec_ctx, &op); }); reset_op(); op.send_initial_metadata = true; op.payload->send_initial_metadata.send_initial_metadata = &b; op.on_complete = c.get(); - s->Op(&op); + s->Op(f.exec_ctx(), &op); f.FlushExecCtx(); + gpr_event_wait(bm_done, gpr_inf_future(GPR_CLOCK_REALTIME)); + done_events.emplace_back(bm_done); + reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s->Op(&op); - s->DestroyThen( - MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + s->Op(f.exec_ctx(), &op); + s->DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx, + grpc_error *error) {})); f.FlushExecCtx(); s.reset(); track_counters.Finish(state); @@ -558,7 +569,7 @@ static void BM_TransportStreamRecv(benchmark::State &state) { op.recv_message = true; op.payload->recv_message.recv_message = &recv_stream; op.payload->recv_message.recv_message_ready = drain_start.get(); - s.Op(&op); + s.Op(exec_ctx, &op); f.PushInput(grpc_slice_ref(incoming_data)); }); @@ -601,7 +612,7 @@ static void BM_TransportStreamRecv(benchmark::State &state) { op.payload->recv_initial_metadata.recv_initial_metadata_ready = do_nothing.get(); op.on_complete = c.get(); - s.Op(&op); + s.Op(f.exec_ctx(), &op); f.PushInput(SLICE_FROM_BUFFER( "\x00\x00\x00\x04\x00\x00\x00\x00\x00" // Generated using: @@ -619,9 +630,9 @@ static void BM_TransportStreamRecv(benchmark::State &state) { reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen( - MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + s.Op(f.exec_ctx(), &op); + s.DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx, + grpc_error *error) {})); f.FlushExecCtx(); track_counters.Finish(state); grpc_metadata_batch_destroy(f.exec_ctx(), &b); -- cgit v1.2.3 From 9bebf8b22a8e058a290f7c7e4b7278c193d17a81 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 1 Sep 2017 10:20:13 -0700 Subject: C++ API for write-through --- include/grpc++/impl/codegen/call.h | 9 +++++++++ test/cpp/end2end/end2end_test.cc | 16 ++++++++++++++++ 2 files changed, 25 insertions(+) (limited to 'test') diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 8e70225f86..74ed5cbfb9 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -169,6 +169,15 @@ class WriteOptions { return *this; } + /// Guarantee that all bytes have been written to the wire before completing + /// this write (usually writes are completed when they pass flow control) + inline WriteOptions& set_write_through() { + SetBit(GRPC_WRITE_THROUGH); + return *this; + } + + inline bool is_write_through() const { return GetBit(GRPC_WRITE_THROUGH); } + /// Get value for the flag indicating that this is the last message, and /// should be coalesced with trailing metadata. /// diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 8bada48a2b..3e6ce9d9c9 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -731,6 +731,22 @@ TEST_P(End2endTest, RequestStreamTwoRequests) { EXPECT_TRUE(s.ok()); } +TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + auto stream = stub_->RequestStream(&context, &response); + request.set_message("hello"); + EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through())); + EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through())); + stream->WritesDone(); + Status s = stream->Finish(); + EXPECT_EQ(response.message(), "hellohello"); + EXPECT_TRUE(s.ok()); +} + TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) { ResetStub(); EchoRequest request; -- cgit v1.2.3 From b675e0a0af91752492c9e981f5745f666631e1c9 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 6 Sep 2017 09:22:39 -0700 Subject: Annotate benign race --- include/grpc/impl/codegen/port_platform.h | 6 ++++++ test/cpp/microbenchmarks/bm_fullstack_trickle.cc | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) (limited to 'test') diff --git a/include/grpc/impl/codegen/port_platform.h b/include/grpc/impl/codegen/port_platform.h index e84a75d295..5bbab70c6a 100644 --- a/include/grpc/impl/codegen/port_platform.h +++ b/include/grpc/impl/codegen/port_platform.h @@ -409,4 +409,10 @@ typedef unsigned __int64 uint64_t; #define CENSUSAPI GRPCAPI #endif +#if defined(__has_feature) +#if __has_feature(thread_sanitizer) +#define GPR_ATTRIBUTE_NO_TSAN __attribute__((no_sanitize("thread"))) +#endif +#endif + #endif /* GRPC_IMPL_CODEGEN_PORT_PLATFORM_H */ diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index 135b4710ce..37dd08c33d 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -105,7 +105,7 @@ class TrickledCHTTP2 : public EndpointPairFixture { (double)state.iterations()); } - void Log(int64_t iteration) { + void Log(int64_t iteration) GPR_ATTRIBUTE_NO_TSAN { auto now = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_); grpc_chttp2_transport* client = reinterpret_cast(client_transport_); -- cgit v1.2.3 From 53e96fe773c48bff7c8037692e51337da6b0f012 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 6 Sep 2017 09:09:27 -0700 Subject: Fix ASAN detected failure --- test/core/iomgr/fd_conservation_posix_test.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'test') diff --git a/test/core/iomgr/fd_conservation_posix_test.c b/test/core/iomgr/fd_conservation_posix_test.c index 3c61173ecd..d29b1e8e41 100644 --- a/test/core/iomgr/fd_conservation_posix_test.c +++ b/test/core/iomgr/fd_conservation_posix_test.c @@ -30,9 +30,8 @@ int main(int argc, char **argv) { grpc_endpoint_pair p; grpc_test_init(argc, argv); + grpc_init(); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_iomgr_init(&exec_ctx); - grpc_iomgr_start(&exec_ctx); /* set max # of file descriptors to a low value, and verify we can create and destroy many more than this number @@ -51,7 +50,7 @@ int main(int argc, char **argv) { grpc_resource_quota_unref(resource_quota); - grpc_iomgr_shutdown(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx); + grpc_shutdown(); return 0; } -- cgit v1.2.3 From ed26a49b2cb3a337493b24273bdb2382e64b9e26 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sun, 10 Sep 2017 20:59:11 -0700 Subject: Add annotation --- test/cpp/microbenchmarks/bm_fullstack_trickle.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'test') diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index 37dd08c33d..59fb29dd60 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -193,7 +193,8 @@ class TrickledCHTTP2 : public EndpointPairFixture { return p; } - void UpdateStats(grpc_chttp2_transport* t, Stats* s, size_t backlog) { + void UpdateStats(grpc_chttp2_transport* t, Stats* s, + size_t backlog) GPR_ATTRIBUTE_NO_TSAN { if (backlog == 0) { if (t->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != NULL) { s->streams_stalled_due_to_stream_flow_control++; -- cgit v1.2.3