diff options
author | 2018-08-26 03:50:39 -0700 | |
---|---|---|
committer | 2018-08-26 03:50:39 -0700 | |
commit | 9c5bde5e4e2cc0c81c3c6411b3aa922d3e995a54 (patch) | |
tree | 270d52c92efbf923855cd24efab620029af0aa7d | |
parent | f71fd84e42cd8053de35df7d6137f7c2e2407ce3 (diff) |
More commits
24 files changed, 361 insertions, 47 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index da55713062..af2341c299 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -242,6 +242,7 @@ add_dependencies(buildtests_c combiner_test) add_dependencies(buildtests_c compression_test) add_dependencies(buildtests_c concurrent_connectivity_test) add_dependencies(buildtests_c connection_refused_test) +add_dependencies(buildtests_c context_list_test) add_dependencies(buildtests_c dns_resolver_connectivity_test) add_dependencies(buildtests_c dns_resolver_cooldown_test) add_dependencies(buildtests_c dns_resolver_test) @@ -6175,6 +6176,33 @@ target_link_libraries(connection_refused_test endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) +add_executable(context_list_test + test/core/transport/chttp2/context_list_test.cc +) + + +target_include_directories(context_list_test + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${_gRPC_SSL_INCLUDE_DIR} + PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR} + PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR} + PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR} + PRIVATE ${_gRPC_CARES_INCLUDE_DIR} + PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR} + PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR} +) + +target_link_libraries(context_list_test + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + grpc +) + +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) + add_executable(dns_resolver_connectivity_test test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc ) @@ -990,6 +990,7 @@ combiner_test: $(BINDIR)/$(CONFIG)/combiner_test compression_test: $(BINDIR)/$(CONFIG)/compression_test concurrent_connectivity_test: $(BINDIR)/$(CONFIG)/concurrent_connectivity_test connection_refused_test: $(BINDIR)/$(CONFIG)/connection_refused_test +context_list_test: $(BINDIR)/$(CONFIG)/context_list_test dns_resolver_connectivity_test: $(BINDIR)/$(CONFIG)/dns_resolver_connectivity_test dns_resolver_cooldown_test: $(BINDIR)/$(CONFIG)/dns_resolver_cooldown_test dns_resolver_test: $(BINDIR)/$(CONFIG)/dns_resolver_test @@ -1445,6 +1446,7 @@ buildtests_c: privatelibs_c \ $(BINDIR)/$(CONFIG)/compression_test \ $(BINDIR)/$(CONFIG)/concurrent_connectivity_test \ $(BINDIR)/$(CONFIG)/connection_refused_test \ + $(BINDIR)/$(CONFIG)/context_list_test \ $(BINDIR)/$(CONFIG)/dns_resolver_connectivity_test \ $(BINDIR)/$(CONFIG)/dns_resolver_cooldown_test \ $(BINDIR)/$(CONFIG)/dns_resolver_test \ @@ -1972,6 +1974,8 @@ test_c: buildtests_c $(Q) $(BINDIR)/$(CONFIG)/concurrent_connectivity_test || ( echo test concurrent_connectivity_test failed ; exit 1 ) $(E) "[RUN] Testing connection_refused_test" $(Q) $(BINDIR)/$(CONFIG)/connection_refused_test || ( echo test connection_refused_test failed ; exit 1 ) + $(E) "[RUN] Testing context_list_test" + $(Q) $(BINDIR)/$(CONFIG)/context_list_test || ( echo test context_list_test failed ; exit 1 ) $(E) "[RUN] Testing dns_resolver_connectivity_test" $(Q) $(BINDIR)/$(CONFIG)/dns_resolver_connectivity_test || ( echo test dns_resolver_connectivity_test failed ; exit 1 ) $(E) "[RUN] Testing dns_resolver_cooldown_test" @@ -11107,6 +11111,38 @@ endif endif +CONTEXT_LIST_TEST_SRC = \ + test/core/transport/chttp2/context_list_test.cc \ + +CONTEXT_LIST_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CONTEXT_LIST_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/context_list_test: openssl_dep_error + +else + + + +$(BINDIR)/$(CONFIG)/context_list_test: $(CONTEXT_LIST_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LD) $(LDFLAGS) $(CONTEXT_LIST_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/context_list_test + +endif + +$(OBJDIR)/$(CONFIG)/test/core/transport/chttp2/context_list_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a + +deps_context_list_test: $(CONTEXT_LIST_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(CONTEXT_LIST_TEST_OBJS:.o=.dep) +endif +endif + + DNS_RESOLVER_CONNECTIVITY_TEST_SRC = \ test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc \ diff --git a/build.yaml b/build.yaml index 5dcbce1df8..47a75d23c1 100644 --- a/build.yaml +++ b/build.yaml @@ -2280,6 +2280,15 @@ targets: - grpc - gpr_test_util - gpr +- name: context_list_test + build: test + language: c + src: + - test/core/transport/chttp2/context_list_test.cc + deps: + - grpc_test_util + - grpc + uses_polling: false - name: dns_resolver_connectivity_test cpu_cost: 0.1 build: test diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index e24e19d903..f0dccefeee 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -31,6 +31,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include "src/core/ext/transport/chttp2/transport/context_list.h" #include "src/core/ext/transport/chttp2/transport/frame_data.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/varint.h" @@ -155,6 +156,7 @@ bool g_flow_control_enabled = true; */ static void destruct_transport(grpc_chttp2_transport* t) { + gpr_log(GPR_INFO, "destruct transport %p", t); size_t i; grpc_endpoint_destroy(t->ep); @@ -164,6 +166,8 @@ static void destruct_transport(grpc_chttp2_transport* t) { grpc_slice_buffer_destroy_internal(&t->outbuf); grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor); + grpc_core::ContextList::Execute(t->cl, nullptr, GRPC_ERROR_NONE); + grpc_slice_buffer_destroy_internal(&t->read_buffer); grpc_chttp2_hpack_parser_destroy(&t->hpack_parser); grpc_chttp2_goaway_parser_destroy(&t->goaway_parser); @@ -236,7 +240,7 @@ static void init_transport(grpc_chttp2_transport* t, size_t i; int j; - grpc_tcp_set_write_timestamps_callback(ContextList::Execute); + grpc_tcp_set_write_timestamps_callback(grpc_core::ContextList::Execute); GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); @@ -1027,13 +1031,16 @@ static void write_action_begin_locked(void* gt, grpc_error* error_ignored) { static void write_action(void* gt, grpc_error* error) { GPR_TIMER_SCOPE("write_action", 0); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt); - void *cl = t->cl; + void* cl = t->cl; t->cl = nullptr; + if (cl) { + gpr_log(GPR_INFO, "cleared for write"); + } grpc_endpoint_write( t->ep, &t->outbuf, GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t, - grpc_combiner_scheduler(t->combiner)), cl - ); + grpc_combiner_scheduler(t->combiner)), + cl); } /* Callback from the grpc_endpoint after bytes have been written by calling @@ -1357,7 +1364,7 @@ static void perform_stream_op_locked(void* stream_op, GRPC_STATS_INC_HTTP2_OP_BATCHES(); - s->context = op->context; + s->context = op->payload->context; if (grpc_http_trace.enabled()) { char* str = grpc_transport_stream_op_batch_string(op); gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p", str, diff --git a/src/core/ext/transport/chttp2/transport/context_list.cc b/src/core/ext/transport/chttp2/transport/context_list.cc index 69ecca09ab..7bfc947ebb 100644 --- a/src/core/ext/transport/chttp2/transport/context_list.cc +++ b/src/core/ext/transport/chttp2/transport/context_list.cc @@ -21,22 +21,32 @@ #include "src/core/ext/transport/chttp2/transport/context_list.h" namespace { -void (*cb)(void *, grpc_core::Timestamps*); +void (*cb)(void*, grpc_core::Timestamps*) = nullptr; } -void ContextList::Execute(ContextList *head, grpc_core::Timestamps *ts, grpc_error* error) { - ContextList *ptr; - while(head != nullptr) { - if(error == GRPC_ERROR_NONE) { - cb(head->context, ts); +namespace grpc_core { +void ContextList::Execute(void* arg, grpc_core::Timestamps* ts, + grpc_error* error) { + gpr_log(GPR_INFO, "execute"); + ContextList* head = static_cast<ContextList*>(arg); + ContextList* ptr; + while (head != nullptr) { + if (error == GRPC_ERROR_NONE && ts != nullptr) { + if (cb) { + cb(head->s->context, ts); + } } + gpr_log(GPR_INFO, "one iteration %p %p", head, arg); + // GRPC_CHTTP2_STREAM_UNREF(static_cast<grpc_chttp2_stream *>(head->s), + // "timestamp exec"); ptr = head; - head_ = head->next; + head = head->next; gpr_free(ptr); } } - -grpc_http2_set_write_timestamps_callback(void (*fn)(void *, grpc_core::Timestamps *)) { +void grpc_http2_set_write_timestamps_callback( + void (*fn)(void*, grpc_core::Timestamps*)) { cb = fn; } +} /* namespace grpc_core */ diff --git a/src/core/ext/transport/chttp2/transport/context_list.h b/src/core/ext/transport/chttp2/transport/context_list.h index 5411cf6bd8..26b8da413e 100644 --- a/src/core/ext/transport/chttp2/transport/context_list.h +++ b/src/core/ext/transport/chttp2/transport/context_list.h @@ -21,44 +21,55 @@ #include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/buffer_list.h" + +#include "src/core/ext/transport/chttp2/transport/internal.h" + namespace grpc_core { /** A list of RPC Contexts */ -class ContextList {\ +class ContextList { public: /* Creates a new element with \a context as the value and appends it to the * list. */ - void Append(ContextList **head, void *context) { + static void Append(ContextList** head, grpc_chttp2_stream* s) { /* Make sure context is not already present */ - ContextList *ptr = *head; - while(ptr != nullptr) { - if(ptr->context == context) { + ContextList* ptr = *head; + // GRPC_CHTTP2_STREAM_REF(s, "timestamp"); + while (ptr != nullptr) { + if (ptr->s == s) { GPR_ASSERT(false); } + ptr = ptr->next; } - ContextList *elem = static_cast<ContextListElement *>(gpr_malloc(sizeof(ContextList))); - elem->context = context; + ContextList* elem = + static_cast<ContextList*>(gpr_malloc(sizeof(ContextList))); + elem->s = s; elem->next = nullptr; - if(*head_ == nullptr) { + if (*head == nullptr) { *head = elem; + gpr_log(GPR_INFO, "new head"); + gpr_log(GPR_INFO, "append %p %p", elem, *head); return; } + gpr_log(GPR_INFO, "append %p %p", elem, *head); ptr = *head; - while(ptr->next != nullptr) { + while (ptr->next != nullptr) { ptr = ptr->next; } ptr->next = elem; } - /* Executes a function \a fn with each context in the list and \a arg. It also + /* Executes a function \a fn with each context in the list and \a ts. It also * frees up the entire list after this operation. */ - void Execute(ContextList *head, grpc_core::Timestamps *ts, grpc_error* error); + static void Execute(void* arg, grpc_core::Timestamps* ts, grpc_error* error); private: - void *context; - ContextListElement *next; + grpc_chttp2_stream* s; + ContextList* next; }; -grpc_http2_set_write_timestamps_callback(void (*fn)(void *, grpc_core::Timestamps*)); +void grpc_http2_set_write_timestamps_callback( + void (*fn)(void*, grpc_core::Timestamps*)); } /* namespace grpc_core */ #endif /* GRPC_CORE_EXT_TRANSPORT_CONTEXT_LIST_H */ diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 78595a6a4a..32a13df48c 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -44,6 +44,10 @@ #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/transport_impl.h" +namespace grpc_core { +class ContextList; +} + /* streams are kept in various linked lists depending on what things need to happen to them... this enum labels each list */ typedef enum { @@ -469,7 +473,7 @@ struct grpc_chttp2_transport { bool keepalive_permit_without_calls; /** keep-alive state machine state */ grpc_chttp2_keepalive_state keepalive_state; - ContextList *cl; + grpc_core::ContextList* cl; }; typedef enum { @@ -480,7 +484,7 @@ typedef enum { } grpc_published_metadata_method; struct grpc_chttp2_stream { - void *context; + void* context; grpc_chttp2_transport* t; grpc_stream_refcount* refcount; diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 51d62f4f60..c9273f7e39 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -18,6 +18,7 @@ #include <grpc/support/port_platform.h> +#include "src/core/ext/transport/chttp2/transport/context_list.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include <limits.h> @@ -487,7 +488,10 @@ class StreamWriteContext { return; // early out: nothing to do } - ContextList::Append(&t_->cl, s_->context); + if (/* traced && */ grpc_endpoint_can_track_err(t_->ep)) { + gpr_log(GPR_INFO, "for transport %p", t_); + grpc_core::ContextList::Append(&t_->cl, s_); + } while ((s_->flow_controlled_buffer.length > 0 || s_->compressed_data_buffer.length > 0) && data_send_context.max_outgoing() > 0) { diff --git a/src/core/lib/iomgr/buffer_list.cc b/src/core/lib/iomgr/buffer_list.cc index 6ada23db1c..d7884a5965 100644 --- a/src/core/lib/iomgr/buffer_list.cc +++ b/src/core/lib/iomgr/buffer_list.cc @@ -31,6 +31,7 @@ namespace grpc_core { void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no, void* arg) { + gpr_log(GPR_INFO, "new entry %u", seq_no); GPR_DEBUG_ASSERT(head != nullptr); TracedBuffer* new_elem = New<TracedBuffer>(seq_no, arg); /* Store the current time as the sendmsg time. */ @@ -64,6 +65,7 @@ void (*timestamps_callback)(void*, grpc_core::Timestamps*, void TracedBuffer::ProcessTimestamp(TracedBuffer** head, struct sock_extended_err* serr, struct scm_timestamping* tss) { + gpr_log(GPR_INFO, "process timestamp %u", serr->ee_data); GPR_DEBUG_ASSERT(head != nullptr); TracedBuffer* elem = *head; TracedBuffer* next = nullptr; @@ -85,6 +87,7 @@ void TracedBuffer::ProcessTimestamp(TracedBuffer** head, /* Got all timestamps. Do the callback and free this TracedBuffer. * The thing below can be passed by value if we don't want the * restriction on the lifetime. */ + gpr_log(GPR_INFO, "calling"); timestamps_callback(elem->arg_, &(elem->ts_), GRPC_ERROR_NONE); next = elem->next_; Delete<TracedBuffer>(elem); @@ -99,18 +102,22 @@ void TracedBuffer::ProcessTimestamp(TracedBuffer** head, } } -void TracedBuffer::Shutdown(TracedBuffer** head, grpc_error* shutdown_err) { +void TracedBuffer::Shutdown(TracedBuffer** head, void* remaining, + grpc_error* shutdown_err) { GPR_DEBUG_ASSERT(head != nullptr); TracedBuffer* elem = *head; + gpr_log(GPR_INFO, "shutdown"); while (elem != nullptr) { - if (timestamps_callback) { - timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err); - } + timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err); + gpr_log(GPR_INFO, "iter"); auto* next = elem->next_; Delete<TracedBuffer>(elem); elem = next; } *head = nullptr; + if (remaining != nullptr) { + timestamps_callback(remaining, nullptr, shutdown_err); + } GRPC_ERROR_UNREF(shutdown_err); } diff --git a/src/core/lib/iomgr/buffer_list.h b/src/core/lib/iomgr/buffer_list.h index cbbf50a657..f7d2f6c370 100644 --- a/src/core/lib/iomgr/buffer_list.h +++ b/src/core/lib/iomgr/buffer_list.h @@ -67,7 +67,7 @@ class TracedBuffer { /** Cleans the list by calling the callback for each traced buffer in the list * with timestamps that it has. */ - static void Shutdown(grpc_core::TracedBuffer** head, + static void Shutdown(grpc_core::TracedBuffer** head, void* remaining, grpc_error* shutdown_err); private: diff --git a/src/core/lib/iomgr/endpoint.cc b/src/core/lib/iomgr/endpoint.cc index 44fb47e19d..5e5effb2f1 100644 --- a/src/core/lib/iomgr/endpoint.cc +++ b/src/core/lib/iomgr/endpoint.cc @@ -61,3 +61,10 @@ int grpc_endpoint_get_fd(grpc_endpoint* ep) { return ep->vtable->get_fd(ep); } grpc_resource_user* grpc_endpoint_get_resource_user(grpc_endpoint* ep) { return ep->vtable->get_resource_user(ep); } + +bool grpc_endpoint_can_track_err(grpc_endpoint* ep) { + if (ep->vtable->can_track_err != nullptr) { + return ep->vtable->can_track_err(ep); + } + return false; +} diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 1f590a80ca..79c8ece263 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -47,6 +47,7 @@ struct grpc_endpoint_vtable { grpc_resource_user* (*get_resource_user)(grpc_endpoint* ep); char* (*get_peer)(grpc_endpoint* ep); int (*get_fd)(grpc_endpoint* ep); + bool (*can_track_err)(grpc_endpoint* ep); }; /* When data is available on the connection, calls the callback with slices. @@ -95,6 +96,8 @@ void grpc_endpoint_delete_from_pollset_set(grpc_endpoint* ep, grpc_resource_user* grpc_endpoint_get_resource_user(grpc_endpoint* endpoint); +bool grpc_endpoint_can_track_err(grpc_endpoint* ep); + struct grpc_endpoint { const grpc_endpoint_vtable* vtable; }; diff --git a/src/core/lib/iomgr/endpoint_pair_posix.cc b/src/core/lib/iomgr/endpoint_pair_posix.cc index 3afbfd7254..5c5c246f99 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.cc +++ b/src/core/lib/iomgr/endpoint_pair_posix.cc @@ -59,11 +59,11 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name, grpc_core::ExecCtx exec_ctx; gpr_asprintf(&final_name, "%s:client", name); - p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, true), args, + p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, false), args, "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, true), args, + p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, false), args, "socketpair-client"); gpr_free(final_name); diff --git a/src/core/lib/iomgr/tcp_custom.cc b/src/core/lib/iomgr/tcp_custom.cc index e02a1898f2..f7a5f36cdc 100644 --- a/src/core/lib/iomgr/tcp_custom.cc +++ b/src/core/lib/iomgr/tcp_custom.cc @@ -326,6 +326,8 @@ static grpc_resource_user* endpoint_get_resource_user(grpc_endpoint* ep) { static int endpoint_get_fd(grpc_endpoint* ep) { return -1; } +static bool endpoint_can_track_err(grpc_endpoint* ep) { return false; } + static grpc_endpoint_vtable vtable = {endpoint_read, endpoint_write, endpoint_add_to_pollset, @@ -335,7 +337,8 @@ static grpc_endpoint_vtable vtable = {endpoint_read, endpoint_destroy, endpoint_get_resource_user, endpoint_get_peer, - endpoint_get_fd}; + endpoint_get_fd, + endpoint_can_track_err}; grpc_endpoint* custom_tcp_endpoint_create(grpc_custom_socket* socket, grpc_resource_quota* resource_quota, diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 1db2790265..23cbc20910 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -375,8 +375,15 @@ static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); } static void tcp_destroy(grpc_endpoint* ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); + gpr_log(GPR_INFO, "tcp destroy %p %p", ep, tcp->outgoing_buffer_arg); grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); if (grpc_event_engine_can_track_errors()) { + gpr_mu_lock(&tcp->tb_mu); + grpc_core::TracedBuffer::Shutdown( + &tcp->tb_head, tcp->outgoing_buffer_arg, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("endpoint destroyed")); + gpr_mu_unlock(&tcp->tb_mu); + tcp->outgoing_buffer_arg = nullptr; gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); grpc_fd_set_error(tcp->em_fd); } @@ -578,6 +585,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, ssize_t* sent_length, grpc_error** error) { if (!tcp->socket_ts_enabled) { + gpr_log(GPR_INFO, "set timestamps"); uint32_t opt = grpc_core::kTimestampingSocketOptions; if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING, static_cast<void*>(&opt), sizeof(opt)) != 0) { @@ -610,6 +618,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, *sent_length = length; /* Only save timestamps if all the bytes were taken by sendmsg. */ if (sending_length == static_cast<size_t>(length)) { + gpr_log(GPR_INFO, "tcp new entry %p %p", tcp, tcp->outgoing_buffer_arg); gpr_mu_lock(&tcp->tb_mu); grpc_core::TracedBuffer::AddNewEntry( &tcp->tb_head, static_cast<int>(tcp->bytes_counter + length), @@ -668,6 +677,7 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, * non-linux platforms, error processing is not used/enabled currently. */ static bool process_errors(grpc_tcp* tcp) { + gpr_log(GPR_INFO, "process errors"); while (true) { struct iovec iov; iov.iov_base = nullptr; @@ -730,6 +740,8 @@ static bool process_errors(grpc_tcp* tcp) { } static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { + gpr_log(GPR_INFO, "handle error %p", arg); + GRPC_LOG_IF_ERROR("handle error", GRPC_ERROR_REF(error)); grpc_tcp* tcp = static_cast<grpc_tcp*>(arg); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, grpc_error_string(error)); @@ -739,7 +751,8 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))) { /* We aren't going to register to hear on error anymore, so it is safe to * unref. */ - grpc_core::TracedBuffer::Shutdown(&tcp->tb_head, GRPC_ERROR_REF(error)); + gpr_log(GPR_INFO, "%p %d early return", error, + static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))); TCP_UNREF(tcp, "error-tracking"); return; } @@ -774,6 +787,17 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { } #endif /* GRPC_LINUX_ERRQUEUE */ +void tcp_shutdown_buffer_list(grpc_tcp* tcp) { + if (tcp->outgoing_buffer_arg) { + gpr_mu_lock(&tcp->tb_mu); + grpc_core::TracedBuffer::Shutdown( + &tcp->tb_head, tcp->outgoing_buffer_arg, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("endpoint destroyed")); + gpr_mu_unlock(&tcp->tb_mu); + tcp->outgoing_buffer_arg = nullptr; + } +} + /* returns true if done, false if pending; if returning true, *error is set */ #if defined(IOV_MAX) && IOV_MAX < 1000 #define MAX_WRITE_IOVEC IOV_MAX @@ -821,8 +845,11 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { msg.msg_flags = 0; if (tcp->outgoing_buffer_arg != nullptr) { if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length, - error)) + error)) { + gpr_log(GPR_INFO, "something went wrong"); + tcp_shutdown_buffer_list(tcp); return true; /* something went wrong with timestamps */ + } } else { msg.msg_control = nullptr; msg.msg_controllen = 0; @@ -844,12 +871,16 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { } return false; } else if (errno == EPIPE) { + gpr_log(GPR_INFO, "something went wrong"); *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); + tcp_shutdown_buffer_list(tcp); return true; } else { + gpr_log(GPR_INFO, "something went wrong"); *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); + tcp_shutdown_buffer_list(tcp); return true; } } @@ -910,6 +941,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, grpc_closure* cb, void* arg) { GPR_TIMER_SCOPE("tcp_write", 0); + gpr_log(GPR_INFO, "tcp_write %p %p", ep, arg); grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); grpc_error* error = GRPC_ERROR_NONE; @@ -926,17 +958,18 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, GPR_ASSERT(tcp->write_cb == nullptr); + tcp->outgoing_buffer_arg = arg; if (buf->length == 0) { GRPC_CLOSURE_SCHED( cb, grpc_fd_is_shutdown(tcp->em_fd) ? tcp_annotate_error( GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), tcp) : GRPC_ERROR_NONE); + tcp_shutdown_buffer_list(tcp); return; } tcp->outgoing_buffer = buf; tcp->outgoing_byte_idx = 0; - tcp->outgoing_buffer_arg = arg; if (arg) { GPR_ASSERT(grpc_event_engine_can_track_errors()); } @@ -949,6 +982,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, } notify_on_write(tcp); } else { + gpr_log(GPR_INFO, "imm sched"); if (grpc_tcp_trace.enabled()) { const char* str = grpc_error_string(error); gpr_log(GPR_INFO, "write: %s", str); @@ -989,6 +1023,23 @@ static grpc_resource_user* tcp_get_resource_user(grpc_endpoint* ep) { return tcp->resource_user; } +static bool tcp_can_track_err(grpc_endpoint* ep) { + grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); + if (!grpc_event_engine_can_track_errors()) { + return false; + } + struct sockaddr addr; + socklen_t len = sizeof(addr); + if (getsockname(tcp->fd, &addr, &len) < 0) { + gpr_log(GPR_ERROR, "getsockname"); + return false; + } + if (addr.sa_family == AF_INET || addr.sa_family == AF_INET6) { + return true; + } + return false; +} + static const grpc_endpoint_vtable vtable = {tcp_read, tcp_write, tcp_add_to_pollset, @@ -998,7 +1049,8 @@ static const grpc_endpoint_vtable vtable = {tcp_read, tcp_destroy, tcp_get_resource_user, tcp_get_peer, - tcp_get_fd}; + tcp_get_fd, + tcp_can_track_err}; #define MAX_CHUNK_SIZE 32 * 1024 * 1024 @@ -1059,6 +1111,7 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->is_first_read = true; tcp->bytes_counter = -1; tcp->socket_ts_enabled = false; + tcp->outgoing_buffer_arg = nullptr; /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); @@ -1097,12 +1150,19 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd, grpc_closure* done) { grpc_network_status_unregister_endpoint(ep); grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); + gpr_log(GPR_INFO, "destroy and release %p %p", ep, tcp->outgoing_buffer_arg); GPR_ASSERT(ep->vtable == &vtable); tcp->release_fd = fd; tcp->release_fd_cb = done; grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); if (grpc_event_engine_can_track_errors()) { /* Stop errors notification. */ + gpr_mu_lock(&tcp->tb_mu); + grpc_core::TracedBuffer::Shutdown( + &tcp->tb_head, tcp->outgoing_buffer_arg, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("endpoint destroyed")); + gpr_mu_unlock(&tcp->tb_mu); + tcp->outgoing_buffer_arg = nullptr; gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); grpc_fd_set_error(tcp->em_fd); } diff --git a/src/core/lib/security/transport/secure_endpoint.cc b/src/core/lib/security/transport/secure_endpoint.cc index f40f969bb7..9b103834e1 100644 --- a/src/core/lib/security/transport/secure_endpoint.cc +++ b/src/core/lib/security/transport/secure_endpoint.cc @@ -389,6 +389,11 @@ static grpc_resource_user* endpoint_get_resource_user( return grpc_endpoint_get_resource_user(ep->wrapped_ep); } +static bool endpoint_can_track_err(grpc_endpoint* secure_ep) { + secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep); + return grpc_endpoint_can_track_err(ep->wrapped_ep); +} + static const grpc_endpoint_vtable vtable = {endpoint_read, endpoint_write, endpoint_add_to_pollset, @@ -398,7 +403,8 @@ static const grpc_endpoint_vtable vtable = {endpoint_read, endpoint_destroy, endpoint_get_resource_user, endpoint_get_peer, - endpoint_get_fd}; + endpoint_get_fd, + endpoint_can_track_err}; grpc_endpoint* grpc_secure_endpoint_create( struct tsi_frame_protector* protector, diff --git a/test/core/iomgr/buffer_list_test.cc b/test/core/iomgr/buffer_list_test.cc index f1773580bd..1086e42fcc 100644 --- a/test/core/iomgr/buffer_list_test.cc +++ b/test/core/iomgr/buffer_list_test.cc @@ -50,7 +50,7 @@ static void TestShutdownFlushesList() { grpc_core::TracedBuffer::AddNewEntry( &list, i, static_cast<void*>(&verifier_called[i])); } - grpc_core::TracedBuffer::Shutdown(&list, GRPC_ERROR_NONE); + grpc_core::TracedBuffer::Shutdown(&list, nullptr, GRPC_ERROR_NONE); GPR_ASSERT(list == nullptr); for (auto i = 0; i < NUM_ELEM; i++) { GPR_ASSERT(gpr_atm_acq_load(&verifier_called[i]) == @@ -88,7 +88,7 @@ static void TestVerifierCalledOnAck() { grpc_core::TracedBuffer::ProcessTimestamp(&list, &serr, &tss); GPR_ASSERT(gpr_atm_acq_load(&verifier_called) == static_cast<gpr_atm>(1)); GPR_ASSERT(list == nullptr); - grpc_core::TracedBuffer::Shutdown(&list, GRPC_ERROR_NONE); + grpc_core::TracedBuffer::Shutdown(&list, nullptr, GRPC_ERROR_NONE); } static void TestTcpBufferList() { diff --git a/test/core/transport/chttp2/BUILD b/test/core/transport/chttp2/BUILD index 6eff716b01..c7bfa1ec09 100644 --- a/test/core/transport/chttp2/BUILD +++ b/test/core/transport/chttp2/BUILD @@ -67,6 +67,19 @@ grpc_cc_test( ) grpc_cc_test( + name = "context_list_test", + srcs = ["context_list_test.cc"], + language = "C++", + deps = [ + "//:gpr", + "//:grpc", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + ], +) + + +grpc_cc_test( name = "hpack_encoder_test", srcs = ["hpack_encoder_test.cc"], language = "C++", diff --git a/test/core/transport/chttp2/context_list_test.cc b/test/core/transport/chttp2/context_list_test.cc new file mode 100644 index 0000000000..1f7a38a107 --- /dev/null +++ b/test/core/transport/chttp2/context_list_test.cc @@ -0,0 +1,64 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#include "src/core/ext/transport/chttp2/transport/context_list.h" + +#include <grpc/grpc.h> + +#include "test/core/util/test_config.h" + +static void TestExecuteFlushesListVerifier(void* arg, + grpc_core::Timestamps* ts) { + GPR_ASSERT(arg != nullptr); + gpr_atm* done = reinterpret_cast<gpr_atm*>(arg); + gpr_atm_rel_store(done, static_cast<gpr_atm>(1)); +} + +/** Tests that all ContextList elements in the list are flushed out on + * execute. + * Also tests that arg is passed correctly. + */ +static void TestExecuteFlushesList() { + grpc_core::ContextList* list = nullptr; + grpc_http2_set_write_timestamps_callback(TestExecuteFlushesListVerifier); +#define NUM_ELEM 5 + grpc_chttp2_stream s[NUM_ELEM]; + gpr_atm verifier_called[NUM_ELEM]; + for (auto i = 0; i < NUM_ELEM; i++) { + s[i].context = &verifier_called[i]; + gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0)); + grpc_core::ContextList::Append(&list, &s[i]); + } + grpc_core::Timestamps ts; + grpc_core::ContextList::Execute(list, &ts, GRPC_ERROR_NONE); + for (auto i = 0; i < NUM_ELEM; i++) { + GPR_ASSERT(gpr_atm_acq_load(&verifier_called[i]) == + static_cast<gpr_atm>(1)); + } +} + +static void TestContextList() { TestExecuteFlushesList(); } + +int main(int argc, char** argv) { + grpc_init(); + TestContextList(); + grpc_shutdown(); + return 0; +} diff --git a/test/core/util/mock_endpoint.cc b/test/core/util/mock_endpoint.cc index ef6fd62b51..570edf18e5 100644 --- a/test/core/util/mock_endpoint.cc +++ b/test/core/util/mock_endpoint.cc @@ -114,6 +114,7 @@ static const grpc_endpoint_vtable vtable = { me_get_resource_user, me_get_peer, me_get_fd, + nullptr, }; grpc_endpoint* grpc_mock_endpoint_create(void (*on_write)(grpc_slice slice), diff --git a/test/core/util/passthru_endpoint.cc b/test/core/util/passthru_endpoint.cc index 3cc8ad6fe1..835a39394c 100644 --- a/test/core/util/passthru_endpoint.cc +++ b/test/core/util/passthru_endpoint.cc @@ -171,6 +171,7 @@ static const grpc_endpoint_vtable vtable = { me_get_resource_user, me_get_peer, me_get_fd, + nullptr, }; static void half_init(half* m, passthru_endpoint* parent, diff --git a/test/core/util/trickle_endpoint.cc b/test/core/util/trickle_endpoint.cc index 62ed72a629..8d93db05e6 100644 --- a/test/core/util/trickle_endpoint.cc +++ b/test/core/util/trickle_endpoint.cc @@ -148,7 +148,8 @@ static const grpc_endpoint_vtable vtable = {te_read, te_destroy, te_get_resource_user, te_get_peer, - te_get_fd}; + te_get_fd, + nullptr}; grpc_endpoint* grpc_trickle_endpoint_create(grpc_endpoint* wrap, double bytes_per_second) { diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 7a679aa3b9..eb5e10abac 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -366,6 +366,21 @@ }, { "deps": [ + "grpc", + "grpc_test_util" + ], + "headers": [], + "is_filegroup": false, + "language": "c", + "name": "context_list_test", + "src": [ + "test/core/transport/chttp2/context_list_test.cc" + ], + "third_party": false, + "type": "target" + }, + { + "deps": [ "gpr", "gpr_test_util", "grpc", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index fba76d69d1..bda78360b1 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -442,6 +442,30 @@ "posix", "windows" ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": false, + "language": "c", + "name": "context_list_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], "cpu_cost": 0.1, "exclude_configs": [], "exclude_iomgrs": [ |