diff options
author | Muxi Yan <muxi@users.noreply.github.com> | 2018-12-10 10:15:45 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-12-10 10:15:45 -0800 |
commit | 037173217011b38ff4675d028eba27a068db5975 (patch) | |
tree | 7bf135f4481365dad05cb4ce181cc8259647a3da /src/core/lib/iomgr | |
parent | 3f00d61b04874cc5f0159c16f2c598a8f2fb93a7 (diff) | |
parent | 60f2d379fec3364ff59f4f0d463b16275525863d (diff) |
Merge branch 'master' into config-isolation
Diffstat (limited to 'src/core/lib/iomgr')
36 files changed, 575 insertions, 103 deletions
diff --git a/src/core/lib/iomgr/buffer_list.cc b/src/core/lib/iomgr/buffer_list.cc index 6ada23db1c..ace17a108d 100644 --- a/src/core/lib/iomgr/buffer_list.cc +++ b/src/core/lib/iomgr/buffer_list.cc @@ -35,6 +35,9 @@ void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no, TracedBuffer* new_elem = New<TracedBuffer>(seq_no, arg); /* Store the current time as the sendmsg time. */ new_elem->ts_.sendmsg_time = gpr_now(GPR_CLOCK_REALTIME); + new_elem->ts_.scheduled_time = gpr_inf_past(GPR_CLOCK_REALTIME); + new_elem->ts_.sent_time = gpr_inf_past(GPR_CLOCK_REALTIME); + new_elem->ts_.acked_time = gpr_inf_past(GPR_CLOCK_REALTIME); if (*head == nullptr) { *head = new_elem; return; @@ -55,10 +58,16 @@ void fill_gpr_from_timestamp(gpr_timespec* gts, const struct timespec* ts) { gts->clock_type = GPR_CLOCK_REALTIME; } +void default_timestamps_callback(void* arg, grpc_core::Timestamps* ts, + grpc_error* shudown_err) { + gpr_log(GPR_DEBUG, "Timestamps callback has not been registered"); +} + /** The saved callback function that will be invoked when we get all the * timestamps that we are going to get for a TracedBuffer. */ void (*timestamps_callback)(void*, grpc_core::Timestamps*, - grpc_error* shutdown_err); + grpc_error* shutdown_err) = + default_timestamps_callback; } /* namespace */ void TracedBuffer::ProcessTimestamp(TracedBuffer** head, @@ -99,18 +108,20 @@ void TracedBuffer::ProcessTimestamp(TracedBuffer** head, } } -void TracedBuffer::Shutdown(TracedBuffer** head, grpc_error* shutdown_err) { +void TracedBuffer::Shutdown(TracedBuffer** head, void* remaining, + grpc_error* shutdown_err) { GPR_DEBUG_ASSERT(head != nullptr); TracedBuffer* elem = *head; while (elem != nullptr) { - if (timestamps_callback) { - timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err); - } + timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err); auto* next = elem->next_; Delete<TracedBuffer>(elem); elem = next; } *head = nullptr; + if (remaining != nullptr) { + timestamps_callback(remaining, nullptr, shutdown_err); + } GRPC_ERROR_UNREF(shutdown_err); } diff --git a/src/core/lib/iomgr/buffer_list.h b/src/core/lib/iomgr/buffer_list.h index cbbf50a657..627f1bde99 100644 --- a/src/core/lib/iomgr/buffer_list.h +++ b/src/core/lib/iomgr/buffer_list.h @@ -37,6 +37,8 @@ struct Timestamps { gpr_timespec scheduled_time; gpr_timespec sent_time; gpr_timespec acked_time; + + uint32_t byte_offset; /* byte offset relative to the start of the RPC */ }; /** TracedBuffer is a class to keep track of timestamps for a specific buffer in @@ -67,13 +69,13 @@ class TracedBuffer { /** Cleans the list by calling the callback for each traced buffer in the list * with timestamps that it has. */ - static void Shutdown(grpc_core::TracedBuffer** head, + static void Shutdown(grpc_core::TracedBuffer** head, void* remaining, grpc_error* shutdown_err); private: GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW - TracedBuffer(int seq_no, void* arg) + TracedBuffer(uint32_t seq_no, void* arg) : seq_no_(seq_no), arg_(arg), next_(nullptr) {} uint32_t seq_no_; /* The sequence number for the last byte in the buffer */ @@ -82,7 +84,12 @@ class TracedBuffer { grpc_core::TracedBuffer* next_; /* The next TracedBuffer in the list */ }; #else /* GRPC_LINUX_ERRQUEUE */ -class TracedBuffer {}; +class TracedBuffer { + public: + /* Dummy shutdown function */ + static void Shutdown(grpc_core::TracedBuffer** head, void* remaining, + grpc_error* shutdown_err) {} +}; #endif /* GRPC_LINUX_ERRQUEUE */ /** Sets the callback function to call when timestamps for a write are diff --git a/src/core/lib/iomgr/call_combiner.cc b/src/core/lib/iomgr/call_combiner.cc index 00a839b64c..6b5759a036 100644 --- a/src/core/lib/iomgr/call_combiner.cc +++ b/src/core/lib/iomgr/call_combiner.cc @@ -39,8 +39,57 @@ static gpr_atm encode_cancel_state_error(grpc_error* error) { return static_cast<gpr_atm>(1) | (gpr_atm)error; } +#ifdef GRPC_TSAN_ENABLED +static void tsan_closure(void* user_data, grpc_error* error) { + grpc_call_combiner* call_combiner = + static_cast<grpc_call_combiner*>(user_data); + // We ref-count the lock, and check if it's already taken. + // If it was taken, we should do nothing. Otherwise, we will mark it as + // locked. Note that if two different threads try to do this, only one of + // them will be able to mark the lock as acquired, while they both run their + // callbacks. In such cases (which should never happen for call_combiner), + // TSAN will correctly produce an error. + // + // TODO(soheil): This only covers the callbacks scheduled by + // grpc_call_combiner_(start|finish). If in the future, a + // callback gets scheduled using other mechanisms, we will need + // to add APIs to externally lock call combiners. + grpc_core::RefCountedPtr<grpc_call_combiner::TsanLock> lock = + call_combiner->tsan_lock; + bool prev = false; + if (lock->taken.compare_exchange_strong(prev, true)) { + TSAN_ANNOTATE_RWLOCK_ACQUIRED(&lock->taken, true); + } else { + lock.reset(); + } + GRPC_CLOSURE_RUN(call_combiner->original_closure, GRPC_ERROR_REF(error)); + if (lock != nullptr) { + TSAN_ANNOTATE_RWLOCK_RELEASED(&lock->taken, true); + bool prev = true; + GPR_ASSERT(lock->taken.compare_exchange_strong(prev, false)); + } +} +#endif + +static void call_combiner_sched_closure(grpc_call_combiner* call_combiner, + grpc_closure* closure, + grpc_error* error) { +#ifdef GRPC_TSAN_ENABLED + call_combiner->original_closure = closure; + GRPC_CLOSURE_SCHED(&call_combiner->tsan_closure, error); +#else + GRPC_CLOSURE_SCHED(closure, error); +#endif +} + void grpc_call_combiner_init(grpc_call_combiner* call_combiner) { + gpr_atm_no_barrier_store(&call_combiner->cancel_state, 0); + gpr_atm_no_barrier_store(&call_combiner->size, 0); gpr_mpscq_init(&call_combiner->queue); +#ifdef GRPC_TSAN_ENABLED + GRPC_CLOSURE_INIT(&call_combiner->tsan_closure, tsan_closure, call_combiner, + grpc_schedule_on_exec_ctx); +#endif } void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) { @@ -85,7 +134,7 @@ void grpc_call_combiner_start(grpc_call_combiner* call_combiner, gpr_log(GPR_INFO, " EXECUTING IMMEDIATELY"); } // Queue was empty, so execute this closure immediately. - GRPC_CLOSURE_SCHED(closure, error); + call_combiner_sched_closure(call_combiner, closure, error); } else { if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_INFO, " QUEUING"); @@ -132,7 +181,8 @@ void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS, gpr_log(GPR_INFO, " EXECUTING FROM QUEUE: closure=%p error=%s", closure, grpc_error_string(closure->error_data.error)); } - GRPC_CLOSURE_SCHED(closure, closure->error_data.error); + call_combiner_sched_closure(call_combiner, closure, + closure->error_data.error); break; } } else if (grpc_call_combiner_trace.enabled()) { diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h index 6f7ddd4043..4ec0044f05 100644 --- a/src/core/lib/iomgr/call_combiner.h +++ b/src/core/lib/iomgr/call_combiner.h @@ -27,7 +27,10 @@ #include "src/core/lib/gpr/mpscq.h" #include "src/core/lib/gprpp/inlined_vector.h" +#include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/dynamic_annotations.h" // A simple, lock-free mechanism for serializing activity related to a // single call. This is similar to a combiner but is more lightweight. @@ -40,14 +43,38 @@ extern grpc_core::TraceFlag grpc_call_combiner_trace; -typedef struct { - gpr_atm size; // size_t, num closures in queue or currently executing +struct grpc_call_combiner { + gpr_atm size = 0; // size_t, num closures in queue or currently executing gpr_mpscq queue; // Either 0 (if not cancelled and no cancellation closure set), // a grpc_closure* (if the lowest bit is 0), // or a grpc_error* (if the lowest bit is 1). - gpr_atm cancel_state; -} grpc_call_combiner; + gpr_atm cancel_state = 0; +#ifdef GRPC_TSAN_ENABLED + // A fake ref-counted lock that is kept alive after the destruction of + // grpc_call_combiner, when we are running the original closure. + // + // Ideally we want to lock and unlock the call combiner as a pointer, when the + // callback is called. However, original_closure is free to trigger + // anything on the call combiner (including destruction of grpc_call). + // Thus, we need a ref-counted structure that can outlive the call combiner. + struct TsanLock + : public grpc_core::RefCounted<TsanLock, + grpc_core::NonPolymorphicRefCount> { + TsanLock() { TSAN_ANNOTATE_RWLOCK_CREATE(&taken); } + ~TsanLock() { TSAN_ANNOTATE_RWLOCK_DESTROY(&taken); } + + // To avoid double-locking by the same thread, we should acquire/release + // the lock only when taken is false. On each acquire taken must be set to + // true. + std::atomic<bool> taken{false}; + }; + grpc_core::RefCountedPtr<TsanLock> tsan_lock = + grpc_core::MakeRefCounted<TsanLock>(); + grpc_closure tsan_closure; + grpc_closure* original_closure; +#endif +}; // Assumes memory was initialized to zero. void grpc_call_combiner_init(grpc_call_combiner* call_combiner); diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index f14c723844..bde3437c02 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -114,6 +114,7 @@ inline grpc_closure* grpc_closure_init(grpc_closure* closure, closure->cb = cb; closure->cb_arg = cb_arg; closure->scheduler = scheduler; + closure->error_data.error = GRPC_ERROR_NONE; #ifndef NDEBUG closure->scheduled = false; closure->file_initiated = nullptr; diff --git a/src/core/lib/iomgr/dynamic_annotations.h b/src/core/lib/iomgr/dynamic_annotations.h new file mode 100644 index 0000000000..713928023a --- /dev/null +++ b/src/core/lib/iomgr/dynamic_annotations.h @@ -0,0 +1,67 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_DYNAMIC_ANNOTATIONS_H +#define GRPC_CORE_LIB_IOMGR_DYNAMIC_ANNOTATIONS_H + +#include <grpc/support/port_platform.h> + +#ifdef GRPC_TSAN_ENABLED + +#define TSAN_ANNOTATE_HAPPENS_BEFORE(addr) \ + AnnotateHappensBefore(__FILE__, __LINE__, (void*)(addr)) +#define TSAN_ANNOTATE_HAPPENS_AFTER(addr) \ + AnnotateHappensAfter(__FILE__, __LINE__, (void*)(addr)) +#define TSAN_ANNOTATE_RWLOCK_CREATE(addr) \ + AnnotateRWLockCreate(__FILE__, __LINE__, (void*)(addr)) +#define TSAN_ANNOTATE_RWLOCK_DESTROY(addr) \ + AnnotateRWLockDestroy(__FILE__, __LINE__, (void*)(addr)) +#define TSAN_ANNOTATE_RWLOCK_ACQUIRED(addr, is_w) \ + AnnotateRWLockAcquired(__FILE__, __LINE__, (void*)(addr), (is_w)) +#define TSAN_ANNOTATE_RWLOCK_RELEASED(addr, is_w) \ + AnnotateRWLockReleased(__FILE__, __LINE__, (void*)(addr), (is_w)) + +#ifdef __cplusplus +extern "C" { +#endif +void AnnotateHappensBefore(const char* file, int line, const volatile void* cv); +void AnnotateHappensAfter(const char* file, int line, const volatile void* cv); +void AnnotateRWLockCreate(const char* file, int line, + const volatile void* lock); +void AnnotateRWLockDestroy(const char* file, int line, + const volatile void* lock); +void AnnotateRWLockAcquired(const char* file, int line, + const volatile void* lock, long is_w); +void AnnotateRWLockReleased(const char* file, int line, + const volatile void* lock, long is_w); +#ifdef __cplusplus +} +#endif + +#else /* GRPC_TSAN_ENABLED */ + +#define TSAN_ANNOTATE_HAPPENS_BEFORE(addr) +#define TSAN_ANNOTATE_HAPPENS_AFTER(addr) +#define TSAN_ANNOTATE_RWLOCK_CREATE(addr) +#define TSAN_ANNOTATE_RWLOCK_DESTROY(addr) +#define TSAN_ANNOTATE_RWLOCK_ACQUIRED(addr, is_w) +#define TSAN_ANNOTATE_RWLOCK_RELEASED(addr, is_w) + +#endif /* GRPC_TSAN_ENABLED */ + +#endif /* GRPC_CORE_LIB_IOMGR_DYNAMIC_ANNOTATIONS_H */ diff --git a/src/core/lib/iomgr/endpoint.cc b/src/core/lib/iomgr/endpoint.cc index 44fb47e19d..06316c6031 100644 --- a/src/core/lib/iomgr/endpoint.cc +++ b/src/core/lib/iomgr/endpoint.cc @@ -61,3 +61,7 @@ int grpc_endpoint_get_fd(grpc_endpoint* ep) { return ep->vtable->get_fd(ep); } grpc_resource_user* grpc_endpoint_get_resource_user(grpc_endpoint* ep) { return ep->vtable->get_resource_user(ep); } + +bool grpc_endpoint_can_track_err(grpc_endpoint* ep) { + return ep->vtable->can_track_err(ep); +} diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 1f590a80ca..79c8ece263 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -47,6 +47,7 @@ struct grpc_endpoint_vtable { grpc_resource_user* (*get_resource_user)(grpc_endpoint* ep); char* (*get_peer)(grpc_endpoint* ep); int (*get_fd)(grpc_endpoint* ep); + bool (*can_track_err)(grpc_endpoint* ep); }; /* When data is available on the connection, calls the callback with slices. @@ -95,6 +96,8 @@ void grpc_endpoint_delete_from_pollset_set(grpc_endpoint* ep, grpc_resource_user* grpc_endpoint_get_resource_user(grpc_endpoint* endpoint); +bool grpc_endpoint_can_track_err(grpc_endpoint* ep); + struct grpc_endpoint { const grpc_endpoint_vtable* vtable; }; diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc index df2cf508c8..7c4bc1ace2 100644 --- a/src/core/lib/iomgr/endpoint_cfstream.cc +++ b/src/core/lib/iomgr/endpoint_cfstream.cc @@ -315,6 +315,8 @@ char* CFStreamGetPeer(grpc_endpoint* ep) { int CFStreamGetFD(grpc_endpoint* ep) { return 0; } +bool CFStreamCanTrackErr(grpc_endpoint* ep) { return false; } + void CFStreamAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {} void CFStreamAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} void CFStreamDeleteFromPollsetSet(grpc_endpoint* ep, @@ -329,7 +331,8 @@ static const grpc_endpoint_vtable vtable = {CFStreamRead, CFStreamDestroy, CFStreamGetResourceUser, CFStreamGetPeer, - CFStreamGetFD}; + CFStreamGetFD, + CFStreamCanTrackErr}; grpc_endpoint* grpc_cfstream_endpoint_create( CFReadStreamRef read_stream, CFWriteStreamRef write_stream, diff --git a/src/core/lib/iomgr/endpoint_pair_posix.cc b/src/core/lib/iomgr/endpoint_pair_posix.cc index 3afbfd7254..5c5c246f99 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.cc +++ b/src/core/lib/iomgr/endpoint_pair_posix.cc @@ -59,11 +59,11 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name, grpc_core::ExecCtx exec_ctx; gpr_asprintf(&final_name, "%s:client", name); - p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, true), args, + p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, false), args, "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, true), args, + p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, false), args, "socketpair-client"); gpr_free(final_name); diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 38571b1957..4b8c891e9b 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -1242,6 +1242,8 @@ static void pollset_set_del_pollset_set(grpc_pollset_set* bag, * Event engine binding */ +static void shutdown_background_closure(void) {} + static void shutdown_engine(void) { fd_global_shutdown(); pollset_global_shutdown(); @@ -1255,6 +1257,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), true, + false, fd_create, fd_wrapped_fd, @@ -1284,6 +1287,7 @@ static const grpc_event_engine_vtable vtable = { pollset_set_add_fd, pollset_set_del_fd, + shutdown_background_closure, shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 06a382c556..7a4870db78 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -1604,6 +1604,8 @@ static void pollset_set_del_pollset_set(grpc_pollset_set* bag, * Event engine binding */ +static void shutdown_background_closure(void) {} + static void shutdown_engine(void) { fd_global_shutdown(); pollset_global_shutdown(); @@ -1612,6 +1614,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), true, + false, fd_create, fd_wrapped_fd, @@ -1641,6 +1644,7 @@ static const grpc_event_engine_vtable vtable = { pollset_set_add_fd, pollset_set_del_fd, + shutdown_background_closure, shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 16562538a6..67cbfbbd02 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -1782,6 +1782,8 @@ static void global_cv_fd_table_shutdown() { * event engine binding */ +static void shutdown_background_closure(void) {} + static void shutdown_engine(void) { pollset_global_shutdown(); if (grpc_cv_wakeup_fds_enabled()) { @@ -1796,6 +1798,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), false, + false, fd_create, fd_wrapped_fd, @@ -1825,6 +1828,7 @@ static const grpc_event_engine_vtable vtable = { pollset_set_add_fd, pollset_set_del_fd, + shutdown_background_closure, shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 8a7dc7b004..32d1b6c43e 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -36,6 +36,7 @@ #include "src/core/lib/iomgr/ev_epoll1_linux.h" #include "src/core/lib/iomgr/ev_epollex_linux.h" #include "src/core/lib/iomgr/ev_poll_posix.h" +#include "src/core/lib/iomgr/internal_errqueue.h" grpc_core::TraceFlag grpc_polling_trace(false, "polling"); /* Disabled by default */ @@ -236,19 +237,22 @@ void grpc_event_engine_shutdown(void) { } bool grpc_event_engine_can_track_errors(void) { -/* Only track errors if platform supports errqueue. */ -#ifdef GRPC_LINUX_ERRQUEUE - return g_event_engine->can_track_err; -#else + /* Only track errors if platform supports errqueue. */ + if (grpc_core::kernel_supports_errqueue()) { + return g_event_engine->can_track_err; + } return false; -#endif /* GRPC_LINUX_ERRQUEUE */ +} + +bool grpc_event_engine_run_in_background(void) { + return g_event_engine->run_in_background; } grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) { GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); - return g_event_engine->fd_create(fd, name, - track_err && g_event_engine->can_track_err); + return g_event_engine->fd_create( + fd, name, track_err && grpc_event_engine_can_track_errors()); } int grpc_fd_wrapped_fd(grpc_fd* fd) { @@ -395,4 +399,8 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) { g_event_engine->pollset_set_del_fd(pollset_set, fd); } +void grpc_shutdown_background_closure(void) { + g_event_engine->shutdown_background_closure(); +} + #endif // GRPC_POSIX_SOCKET_EV diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index b8fb8f534b..812c7a0f0f 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -42,6 +42,7 @@ typedef struct grpc_fd grpc_fd; typedef struct grpc_event_engine_vtable { size_t pollset_size; bool can_track_err; + bool run_in_background; grpc_fd* (*fd_create)(int fd, const char* name, bool track_err); int (*fd_wrapped_fd)(grpc_fd* fd); @@ -79,6 +80,7 @@ typedef struct grpc_event_engine_vtable { void (*pollset_set_add_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd); void (*pollset_set_del_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd); + void (*shutdown_background_closure)(void); void (*shutdown_engine)(void); } grpc_event_engine_vtable; @@ -101,6 +103,11 @@ const char* grpc_get_poll_strategy_name(); */ bool grpc_event_engine_can_track_errors(); +/* Returns true if polling engine runs in the background, false otherwise. + * Currently only 'epollbg' runs in the background. + */ +bool grpc_event_engine_run_in_background(); + /* Create a wrapped file descriptor. Requires fd is a non-blocking file descriptor. \a track_err if true means that error events would be tracked separately @@ -174,6 +181,9 @@ void grpc_pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd); void grpc_pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd); void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd); +/* Shut down all the closures registered in the background poller. */ +void grpc_shutdown_background_closure(); + /* override to allow tests to hook poll() usage */ typedef int (*grpc_poll_function_type)(struct pollfd*, nfds_t, int); extern grpc_poll_function_type grpc_poll_function; diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc index e957bad73d..05ecd2a49b 100644 --- a/src/core/lib/iomgr/fork_posix.cc +++ b/src/core/lib/iomgr/fork_posix.cc @@ -60,7 +60,7 @@ void grpc_prefork() { } if (strcmp(grpc_get_poll_strategy_name(), "epoll1") != 0 && strcmp(grpc_get_poll_strategy_name(), "poll") != 0) { - gpr_log(GPR_ERROR, + gpr_log(GPR_INFO, "Fork support is only compatible with the epoll1 and poll polling " "strategies"); } diff --git a/src/core/lib/iomgr/internal_errqueue.cc b/src/core/lib/iomgr/internal_errqueue.cc index 99c22e9055..982d709f09 100644 --- a/src/core/lib/iomgr/internal_errqueue.cc +++ b/src/core/lib/iomgr/internal_errqueue.cc @@ -20,17 +20,50 @@ #include "src/core/lib/iomgr/port.h" +#include <grpc/impl/codegen/log.h> #include "src/core/lib/iomgr/internal_errqueue.h" #ifdef GRPC_POSIX_SOCKET_TCP -bool kernel_supports_errqueue() { +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <sys/utsname.h> + +namespace grpc_core { +static bool errqueue_supported = false; + +bool kernel_supports_errqueue() { return errqueue_supported; } + +void grpc_errqueue_init() { +/* Both-compile time and run-time linux kernel versions should be atleast 4.0.0 + */ #ifdef LINUX_VERSION_CODE #if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) - return true; + struct utsname buffer; + if (uname(&buffer) != 0) { + gpr_log(GPR_ERROR, "uname: %s", strerror(errno)); + return; + } + char* release = buffer.release; + if (release == nullptr) { + return; + } + + if (strtol(release, nullptr, 10) >= 4) { + errqueue_supported = true; + } else { + gpr_log(GPR_DEBUG, "ERRQUEUE support not enabled"); + } #endif /* LINUX_VERSION_CODE <= KERNEL_VERSION(4, 0, 0) */ #endif /* LINUX_VERSION_CODE */ - return false; } +} /* namespace grpc_core */ + +#else + +namespace grpc_core { +void grpc_errqueue_init() {} +} /* namespace grpc_core */ #endif /* GRPC_POSIX_SOCKET_TCP */ diff --git a/src/core/lib/iomgr/internal_errqueue.h b/src/core/lib/iomgr/internal_errqueue.h index 9d122808f9..f8644c2536 100644 --- a/src/core/lib/iomgr/internal_errqueue.h +++ b/src/core/lib/iomgr/internal_errqueue.h @@ -76,8 +76,14 @@ constexpr uint32_t kTimestampingRecordingOptions = * Currently allowing only linux kernels above 4.0.0 */ bool kernel_supports_errqueue(); -} // namespace grpc_core + +} /* namespace grpc_core */ #endif /* GRPC_POSIX_SOCKET_TCP */ +namespace grpc_core { +/* Initializes errqueue support */ +void grpc_errqueue_init(); +} /* namespace grpc_core */ + #endif /* GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H */ diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index 46afda1774..eb29973514 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -33,8 +33,10 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/iomgr/buffer_list.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/internal_errqueue.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/timer.h" @@ -57,6 +59,7 @@ void grpc_iomgr_init() { g_root_object.name = (char*)"root"; grpc_network_status_init(); grpc_iomgr_platform_init(); + grpc_core::grpc_errqueue_init(); } void grpc_iomgr_start() { grpc_timer_manager_init(); } @@ -154,6 +157,10 @@ void grpc_iomgr_shutdown() { gpr_cv_destroy(&g_rcv); } +void grpc_iomgr_shutdown_background_closure() { + grpc_iomgr_platform_shutdown_background_closure(); +} + void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name) { obj->name = gpr_strdup(name); gpr_mu_lock(&g_mu); diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h index 537ef8a6ff..8ea9289e06 100644 --- a/src/core/lib/iomgr/iomgr.h +++ b/src/core/lib/iomgr/iomgr.h @@ -35,6 +35,10 @@ void grpc_iomgr_start(); * exec_ctx. */ void grpc_iomgr_shutdown(); +/** Signals the intention to shutdown all the closures registered in the + * background poller. */ +void grpc_iomgr_shutdown_background_closure(); + /* Exposed only for testing */ size_t grpc_iomgr_count_objects_for_testing(); diff --git a/src/core/lib/iomgr/iomgr_custom.cc b/src/core/lib/iomgr/iomgr_custom.cc index d34c8e7cd1..4b112c9097 100644 --- a/src/core/lib/iomgr/iomgr_custom.cc +++ b/src/core/lib/iomgr/iomgr_custom.cc @@ -40,9 +40,11 @@ static void iomgr_platform_init(void) { } static void iomgr_platform_flush(void) {} static void iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); } +static void iomgr_platform_shutdown_background_closure(void) {} static grpc_iomgr_platform_vtable vtable = { - iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown}; + iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, + iomgr_platform_shutdown_background_closure}; void grpc_custom_iomgr_init(grpc_socket_vtable* socket, grpc_custom_resolver_vtable* resolver, diff --git a/src/core/lib/iomgr/iomgr_internal.cc b/src/core/lib/iomgr/iomgr_internal.cc index 32dbabb79d..b6c9211865 100644 --- a/src/core/lib/iomgr/iomgr_internal.cc +++ b/src/core/lib/iomgr/iomgr_internal.cc @@ -41,3 +41,7 @@ void grpc_iomgr_platform_init() { iomgr_platform_vtable->init(); } void grpc_iomgr_platform_flush() { iomgr_platform_vtable->flush(); } void grpc_iomgr_platform_shutdown() { iomgr_platform_vtable->shutdown(); } + +void grpc_iomgr_platform_shutdown_background_closure() { + iomgr_platform_vtable->shutdown_background_closure(); +} diff --git a/src/core/lib/iomgr/iomgr_internal.h b/src/core/lib/iomgr/iomgr_internal.h index b011d9c7b1..bca7409907 100644 --- a/src/core/lib/iomgr/iomgr_internal.h +++ b/src/core/lib/iomgr/iomgr_internal.h @@ -35,6 +35,7 @@ typedef struct grpc_iomgr_platform_vtable { void (*init)(void); void (*flush)(void); void (*shutdown)(void); + void (*shutdown_background_closure)(void); } grpc_iomgr_platform_vtable; void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name); @@ -52,6 +53,9 @@ void grpc_iomgr_platform_flush(void); /** tear down all platform specific global iomgr structures */ void grpc_iomgr_platform_shutdown(void); +/** shut down all the closures registered in the background poller */ +void grpc_iomgr_platform_shutdown_background_closure(void); + bool grpc_iomgr_abort_on_leaks(void); #endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */ diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc index ca7334c9a4..9386adf060 100644 --- a/src/core/lib/iomgr/iomgr_posix.cc +++ b/src/core/lib/iomgr/iomgr_posix.cc @@ -51,8 +51,13 @@ static void iomgr_platform_shutdown(void) { grpc_wakeup_fd_global_destroy(); } +static void iomgr_platform_shutdown_background_closure(void) { + grpc_shutdown_background_closure(); +} + static grpc_iomgr_platform_vtable vtable = { - iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown}; + iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, + iomgr_platform_shutdown_background_closure}; void grpc_set_default_iomgr_platform() { grpc_set_tcp_client_impl(&grpc_posix_tcp_client_vtable); diff --git a/src/core/lib/iomgr/iomgr_posix_cfstream.cc b/src/core/lib/iomgr/iomgr_posix_cfstream.cc index 235a9e0712..552ef4309c 100644 --- a/src/core/lib/iomgr/iomgr_posix_cfstream.cc +++ b/src/core/lib/iomgr/iomgr_posix_cfstream.cc @@ -54,8 +54,13 @@ static void iomgr_platform_shutdown(void) { grpc_wakeup_fd_global_destroy(); } +static void iomgr_platform_shutdown_background_closure(void) { + grpc_shutdown_background_closure(); +} + static grpc_iomgr_platform_vtable vtable = { - iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown}; + iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, + iomgr_platform_shutdown_background_closure}; void grpc_set_default_iomgr_platform() { char* enable_cfstream = getenv(grpc_cfstream_env_var); diff --git a/src/core/lib/iomgr/iomgr_windows.cc b/src/core/lib/iomgr/iomgr_windows.cc index cdef89cbf0..24ef0dba7b 100644 --- a/src/core/lib/iomgr/iomgr_windows.cc +++ b/src/core/lib/iomgr/iomgr_windows.cc @@ -71,8 +71,11 @@ static void iomgr_platform_shutdown(void) { winsock_shutdown(); } +static void iomgr_platform_shutdown_background_closure(void) {} + static grpc_iomgr_platform_vtable vtable = { - iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown}; + iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, + iomgr_platform_shutdown_background_closure}; void grpc_set_default_iomgr_platform() { grpc_set_tcp_client_impl(&grpc_windows_tcp_client_vtable); diff --git a/src/core/lib/iomgr/polling_entity.h b/src/core/lib/iomgr/polling_entity.h index a95e08524c..6f4c5bdd66 100644 --- a/src/core/lib/iomgr/polling_entity.h +++ b/src/core/lib/iomgr/polling_entity.h @@ -34,13 +34,13 @@ typedef enum grpc_pollset_tag { * functions that accept a pollset XOR a pollset_set to do so through an * abstract interface. No ownership is taken. */ -typedef struct grpc_polling_entity { +struct grpc_polling_entity { union { - grpc_pollset* pollset; + grpc_pollset* pollset = nullptr; grpc_pollset_set* pollset_set; } pollent; - grpc_pollset_tag tag; -} grpc_polling_entity; + grpc_pollset_tag tag = GRPC_POLLS_NONE; +}; grpc_polling_entity grpc_polling_entity_create_from_pollset_set( grpc_pollset_set* pollset_set); diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index bf56a7298d..c8046b21dc 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -62,8 +62,7 @@ #define GRPC_HAVE_UNIX_SOCKET 1 #ifdef LINUX_VERSION_CODE #if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) -/* TODO(yashykt): Re-enable once Fathom changes are commited. -#define GRPC_LINUX_ERRQUEUE 1 */ +#define GRPC_LINUX_ERRQUEUE 1 #endif /* LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) */ #endif /* LINUX_VERSION_CODE */ #define GRPC_LINUX_MULTIPOLL_WITH_EPOLL 1 diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h index 6afe94a7a9..7016ffc31a 100644 --- a/src/core/lib/iomgr/resolve_address.h +++ b/src/core/lib/iomgr/resolve_address.h @@ -65,7 +65,7 @@ void grpc_set_resolver_impl(grpc_address_resolver_vtable* vtable); /* Asynchronously resolve addr. Use default_port if a port isn't designated in addr, otherwise use the port in addr. */ -/* TODO(ctiller): add a timeout here */ +/* TODO(apolcyn): add a timeout here */ void grpc_resolve_address(const char* addr, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index b6fc7579f7..7e4b3c9b2f 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -90,7 +90,8 @@ struct grpc_resource_user { grpc_closure_list on_allocated; /* True if we are currently trying to allocate from the quota, false if not */ bool allocating; - /* How many bytes of allocations are outstanding */ + /* The amount of memory (in bytes) that has been requested from this user + * asynchronously but hasn't been granted yet. */ int64_t outstanding_allocations; /* True if we are currently trying to add ourselves to the non-free quota list, false otherwise */ @@ -135,6 +136,9 @@ struct grpc_resource_quota { int64_t size; /* Amount of free memory in the resource quota */ int64_t free_pool; + /* Used size of memory in the resource quota. Updated as soon as the resource + * users start to allocate or free the memory. */ + gpr_atm used; gpr_atm last_size; @@ -371,6 +375,7 @@ static bool rq_reclaim_from_per_user_free_pool( while ((resource_user = rulist_pop_head(resource_quota, GRPC_RULIST_NON_EMPTY_FREE_POOL))) { gpr_mu_lock(&resource_user->mu); + resource_user->added_to_free_pool = false; if (resource_user->free_pool > 0) { int64_t amt = resource_user->free_pool; resource_user->free_pool = 0; @@ -386,6 +391,13 @@ static bool rq_reclaim_from_per_user_free_pool( gpr_mu_unlock(&resource_user->mu); return true; } else { + if (grpc_resource_quota_trace.enabled()) { + gpr_log(GPR_INFO, + "RQ %s %s: failed to reclaim_from_per_user_free_pool; " + "free_pool = %" PRId64 "; rq_free_pool = %" PRId64, + resource_quota->name, resource_user->name, + resource_user->free_pool, resource_quota->free_pool); + } gpr_mu_unlock(&resource_user->mu); } } @@ -622,6 +634,7 @@ grpc_resource_quota* grpc_resource_quota_create(const char* name) { resource_quota->combiner = grpc_combiner_create(); resource_quota->free_pool = INT64_MAX; resource_quota->size = INT64_MAX; + resource_quota->used = 0; gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX); gpr_mu_init(&resource_quota->thread_count_mu); resource_quota->max_threads = INT_MAX; @@ -712,7 +725,7 @@ size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) { */ grpc_resource_quota* grpc_resource_quota_from_channel_args( - const grpc_channel_args* channel_args) { + const grpc_channel_args* channel_args, bool create) { for (size_t i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { if (channel_args->args[i].type == GRPC_ARG_POINTER) { @@ -724,7 +737,7 @@ grpc_resource_quota* grpc_resource_quota_from_channel_args( } } } - return grpc_resource_quota_create(nullptr); + return create ? grpc_resource_quota_create(nullptr) : nullptr; } static void* rq_copy(void* rq) { @@ -863,33 +876,68 @@ void grpc_resource_user_free_threads(grpc_resource_user* resource_user, gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu); } -void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size, - grpc_closure* optional_on_done) { - gpr_mu_lock(&resource_user->mu); +static void resource_user_alloc_locked(grpc_resource_user* resource_user, + size_t size, + grpc_closure* optional_on_done) { ru_ref_by(resource_user, static_cast<gpr_atm>(size)); resource_user->free_pool -= static_cast<int64_t>(size); - resource_user->outstanding_allocations += static_cast<int64_t>(size); if (grpc_resource_quota_trace.enabled()) { gpr_log(GPR_INFO, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64, resource_user->resource_quota->name, resource_user->name, size, resource_user->free_pool); } if (resource_user->free_pool < 0) { - grpc_closure_list_append(&resource_user->on_allocated, optional_on_done, - GRPC_ERROR_NONE); + if (optional_on_done != nullptr) { + resource_user->outstanding_allocations += static_cast<int64_t>(size); + grpc_closure_list_append(&resource_user->on_allocated, optional_on_done, + GRPC_ERROR_NONE); + } if (!resource_user->allocating) { resource_user->allocating = true; GRPC_CLOSURE_SCHED(&resource_user->allocate_closure, GRPC_ERROR_NONE); } } else { - resource_user->outstanding_allocations -= static_cast<int64_t>(size); GRPC_CLOSURE_SCHED(optional_on_done, GRPC_ERROR_NONE); } +} + +bool grpc_resource_user_safe_alloc(grpc_resource_user* resource_user, + size_t size) { + if (gpr_atm_no_barrier_load(&resource_user->shutdown)) return false; + gpr_mu_lock(&resource_user->mu); + grpc_resource_quota* resource_quota = resource_user->resource_quota; + bool cas_success; + do { + gpr_atm used = gpr_atm_no_barrier_load(&resource_quota->used); + gpr_atm new_used = used + size; + if (static_cast<size_t>(new_used) > + grpc_resource_quota_peek_size(resource_quota)) { + gpr_mu_unlock(&resource_user->mu); + return false; + } + cas_success = gpr_atm_full_cas(&resource_quota->used, used, new_used); + } while (!cas_success); + resource_user_alloc_locked(resource_user, size, nullptr); + gpr_mu_unlock(&resource_user->mu); + return true; +} + +void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size, + grpc_closure* optional_on_done) { + // TODO(juanlishen): Maybe return immediately if shutting down. Deferring this + // because some tests become flaky after the change. + gpr_mu_lock(&resource_user->mu); + grpc_resource_quota* resource_quota = resource_user->resource_quota; + gpr_atm_no_barrier_fetch_add(&resource_quota->used, size); + resource_user_alloc_locked(resource_user, size, optional_on_done); gpr_mu_unlock(&resource_user->mu); } void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) { gpr_mu_lock(&resource_user->mu); + grpc_resource_quota* resource_quota = resource_user->resource_quota; + gpr_atm prior = gpr_atm_no_barrier_fetch_add(&resource_quota->used, -size); + GPR_ASSERT(prior >= static_cast<long>(size)); bool was_zero_or_negative = resource_user->free_pool <= 0; resource_user->free_pool += static_cast<int64_t>(size); if (grpc_resource_quota_trace.enabled()) { @@ -940,6 +988,12 @@ void grpc_resource_user_slice_allocator_init( void grpc_resource_user_alloc_slices( grpc_resource_user_slice_allocator* slice_allocator, size_t length, size_t count, grpc_slice_buffer* dest) { + if (gpr_atm_no_barrier_load(&slice_allocator->resource_user->shutdown)) { + GRPC_CLOSURE_SCHED( + &slice_allocator->on_allocated, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown")); + return; + } slice_allocator->length = length; slice_allocator->count = count; slice_allocator->dest = dest; diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h index 7b0ed7417a..1c79b52e3f 100644 --- a/src/core/lib/iomgr/resource_quota.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -65,11 +65,16 @@ extern grpc_core::TraceFlag grpc_resource_quota_trace; +// TODO(juanlishen): This is a hack. We need to do real accounting instead of +// hard coding. +constexpr size_t GRPC_RESOURCE_QUOTA_CALL_SIZE = 15 * 1024; +constexpr size_t GRPC_RESOURCE_QUOTA_CHANNEL_SIZE = 50 * 1024; + grpc_resource_quota* grpc_resource_quota_ref_internal( grpc_resource_quota* resource_quota); void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota); grpc_resource_quota* grpc_resource_quota_from_channel_args( - const grpc_channel_args* channel_args); + const grpc_channel_args* channel_args, bool create = true); /* Return a number indicating current memory pressure: 0.0 ==> no memory usage @@ -109,11 +114,21 @@ bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user, void grpc_resource_user_free_threads(grpc_resource_user* resource_user, int thread_count); -/* Allocate from the resource user (and its quota). - If optional_on_done is NULL, then allocate immediately. This may push the - quota over-limit, at which point reclamation will kick in. - If optional_on_done is non-NULL, it will be scheduled when the allocation has - been granted by the quota. */ +/* Allocates from the resource user 'size' worth of memory if this won't exceed + * the resource quota's total size. Returns whether the allocation is done + * successfully. If allocated successfully, the memory should be freed by the + * caller eventually. */ +bool grpc_resource_user_safe_alloc(grpc_resource_user* resource_user, + size_t size); +/* Allocates from the resource user 'size' worth of memory. + * If optional_on_done is NULL, then allocate immediately. This may push the + * quota over-limit, at which point reclamation will kick in. The caller is + * always responsible to free the memory eventually. + * If optional_on_done is non-NULL, it will be scheduled without error when the + * allocation has been granted by the quota, and the caller is responsible to + * free the memory eventually. Or it may be scheduled with an error, in which + * case the caller fails to allocate the memory and shouldn't free the memory. + */ void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size, grpc_closure* optional_on_done); /* Release memory back to the quota */ diff --git a/src/core/lib/iomgr/socket_utils_common_posix.cc b/src/core/lib/iomgr/socket_utils_common_posix.cc index bdfc1d70c3..4c337a0521 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.cc +++ b/src/core/lib/iomgr/socket_utils_common_posix.cc @@ -296,14 +296,17 @@ grpc_error* grpc_set_socket_tcp_user_timeout( socklen_t len = sizeof(newval); if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout, sizeof(timeout))) { - return GRPC_OS_ERROR(errno, "setsockopt(TCP_USER_TIMEOUT)"); + gpr_log(GPR_ERROR, "setsockopt(TCP_USER_TIMEOUT) %s", strerror(errno)); + return GRPC_ERROR_NONE; } if (0 != getsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len)) { - return GRPC_OS_ERROR(errno, "getsockopt(TCP_USER_TIMEOUT)"); + gpr_log(GPR_ERROR, "getsockopt(TCP_USER_TIMEOUT) %s", strerror(errno)); + return GRPC_ERROR_NONE; } if (newval != timeout) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Failed to set TCP_USER_TIMEOUT"); + /* Do not fail on failing to set TCP_USER_TIMEOUT for now. */ + gpr_log(GPR_ERROR, "Failed to set TCP_USER_TIMEOUT"); + return GRPC_ERROR_NONE; } } #else diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 8553ed0db4..0bff74e88b 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -76,6 +76,8 @@ static grpc_error* prepare_socket(const grpc_resolved_address* addr, int fd, if (!grpc_is_unix_socket(addr)) { err = grpc_set_socket_low_latency(fd, 1); if (err != GRPC_ERROR_NONE) goto error; + err = grpc_set_socket_reuse_addr(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; err = grpc_set_socket_tcp_user_timeout(fd, channel_args, true /* is_client */); if (err != GRPC_ERROR_NONE) goto error; diff --git a/src/core/lib/iomgr/tcp_custom.cc b/src/core/lib/iomgr/tcp_custom.cc index e02a1898f2..f7a5f36cdc 100644 --- a/src/core/lib/iomgr/tcp_custom.cc +++ b/src/core/lib/iomgr/tcp_custom.cc @@ -326,6 +326,8 @@ static grpc_resource_user* endpoint_get_resource_user(grpc_endpoint* ep) { static int endpoint_get_fd(grpc_endpoint* ep) { return -1; } +static bool endpoint_can_track_err(grpc_endpoint* ep) { return false; } + static grpc_endpoint_vtable vtable = {endpoint_read, endpoint_write, endpoint_add_to_pollset, @@ -335,7 +337,8 @@ static grpc_endpoint_vtable vtable = {endpoint_read, endpoint_destroy, endpoint_get_resource_user, endpoint_get_peer, - endpoint_get_fd}; + endpoint_get_fd, + endpoint_can_track_err}; grpc_endpoint* custom_tcp_endpoint_create(grpc_custom_socket* socket, grpc_resource_quota* resource_quota, diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index aa2704ce26..cfcb190d60 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -260,10 +260,17 @@ static void notify_on_write(grpc_tcp* tcp) { if (grpc_tcp_trace.enabled()) { gpr_log(GPR_INFO, "TCP:%p notify_on_write", tcp); } - cover_self(tcp); - GRPC_CLOSURE_INIT(&tcp->write_done_closure, - tcp_drop_uncovered_then_handle_write, tcp, - grpc_schedule_on_exec_ctx); + if (grpc_event_engine_run_in_background()) { + // If there is a polling engine always running in the background, there is + // no need to run the backup poller. + GRPC_CLOSURE_INIT(&tcp->write_done_closure, tcp_handle_write, tcp, + grpc_schedule_on_exec_ctx); + } else { + cover_self(tcp); + GRPC_CLOSURE_INIT(&tcp->write_done_closure, + tcp_drop_uncovered_then_handle_write, tcp, + grpc_schedule_on_exec_ctx); + } grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_done_closure); } @@ -384,6 +391,12 @@ static void tcp_destroy(grpc_endpoint* ep) { grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); if (grpc_event_engine_can_track_errors()) { + gpr_mu_lock(&tcp->tb_mu); + grpc_core::TracedBuffer::Shutdown( + &tcp->tb_head, tcp->outgoing_buffer_arg, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("endpoint destroyed")); + gpr_mu_unlock(&tcp->tb_mu); + tcp->outgoing_buffer_arg = nullptr; gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); grpc_fd_set_error(tcp->em_fd); } @@ -621,7 +634,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, if (sending_length == static_cast<size_t>(length)) { gpr_mu_lock(&tcp->tb_mu); grpc_core::TracedBuffer::AddNewEntry( - &tcp->tb_head, static_cast<int>(tcp->bytes_counter + length), + &tcp->tb_head, static_cast<uint32_t>(tcp->bytes_counter + length), tcp->outgoing_buffer_arg); gpr_mu_unlock(&tcp->tb_mu); tcp->outgoing_buffer_arg = nullptr; @@ -673,11 +686,9 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, } /** For linux platforms, reads the socket's error queue and processes error - * messages from the queue. Returns true if all the errors processed were - * timestamps. Returns false if any of the errors were not timestamps. For - * non-linux platforms, error processing is not used/enabled currently. + * messages from the queue. */ -static bool process_errors(grpc_tcp* tcp) { +static void process_errors(grpc_tcp* tcp) { while (true) { struct iovec iov; iov.iov_base = nullptr; @@ -706,10 +717,10 @@ static bool process_errors(grpc_tcp* tcp) { } while (r < 0 && saved_errno == EINTR); if (r == -1 && saved_errno == EAGAIN) { - return true; /* No more errors to process */ + return; /* No more errors to process */ } if (r == -1) { - return false; + return; } if (grpc_tcp_trace.enabled()) { if ((msg.msg_flags & MSG_CTRUNC) == 1) { @@ -719,8 +730,9 @@ static bool process_errors(grpc_tcp* tcp) { if (msg.msg_controllen == 0) { /* There was no control message found. It was probably spurious. */ - return true; + return; } + bool seen = false; for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len; cmsg = CMSG_NXTHDR(&msg, cmsg)) { if (cmsg->cmsg_level != SOL_SOCKET || @@ -732,9 +744,13 @@ static bool process_errors(grpc_tcp* tcp) { "unknown control message cmsg_level:%d cmsg_type:%d", cmsg->cmsg_level, cmsg->cmsg_type); } - return false; + return; } - process_timestamp(tcp, &msg, cmsg); + cmsg = process_timestamp(tcp, &msg, cmsg); + seen = true; + } + if (!seen) { + return; } } } @@ -749,20 +765,17 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))) { /* We aren't going to register to hear on error anymore, so it is safe to * unref. */ - grpc_core::TracedBuffer::Shutdown(&tcp->tb_head, GRPC_ERROR_REF(error)); TCP_UNREF(tcp, "error-tracking"); return; } /* We are still interested in collecting timestamps, so let's try reading * them. */ - if (!process_errors(tcp)) { - /* This was not a timestamps error. This was an actual error. Set the - * read and write closures to be ready. - */ - grpc_fd_set_readable(tcp->em_fd); - grpc_fd_set_writable(tcp->em_fd); - } + process_errors(tcp); + /* This might not a timestamps error. Set the read and write closures to be + * ready. */ + grpc_fd_set_readable(tcp->em_fd); + grpc_fd_set_writable(tcp->em_fd); GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp, grpc_schedule_on_exec_ctx); grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure); @@ -784,6 +797,19 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { } #endif /* GRPC_LINUX_ERRQUEUE */ +/* If outgoing_buffer_arg is filled, shuts down the list early, so that any + * release operations needed can be performed on the arg */ +void tcp_shutdown_buffer_list(grpc_tcp* tcp) { + if (tcp->outgoing_buffer_arg) { + gpr_mu_lock(&tcp->tb_mu); + grpc_core::TracedBuffer::Shutdown( + &tcp->tb_head, tcp->outgoing_buffer_arg, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("endpoint destroyed")); + gpr_mu_unlock(&tcp->tb_mu); + tcp->outgoing_buffer_arg = nullptr; + } +} + /* returns true if done, false if pending; if returning true, *error is set */ #if defined(IOV_MAX) && IOV_MAX < 1000 #define MAX_WRITE_IOVEC IOV_MAX @@ -831,8 +857,10 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { msg.msg_flags = 0; if (tcp->outgoing_buffer_arg != nullptr) { if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length, - error)) + error)) { + tcp_shutdown_buffer_list(tcp); return true; /* something went wrong with timestamps */ + } } else { msg.msg_control = nullptr; msg.msg_controllen = 0; @@ -856,10 +884,12 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { } else if (errno == EPIPE) { *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); + tcp_shutdown_buffer_list(tcp); return true; } else { *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); + tcp_shutdown_buffer_list(tcp); return true; } } @@ -936,17 +966,18 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, GPR_ASSERT(tcp->write_cb == nullptr); + tcp->outgoing_buffer_arg = arg; if (buf->length == 0) { GRPC_CLOSURE_SCHED( cb, grpc_fd_is_shutdown(tcp->em_fd) ? tcp_annotate_error( GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), tcp) : GRPC_ERROR_NONE); + tcp_shutdown_buffer_list(tcp); return; } tcp->outgoing_buffer = buf; tcp->outgoing_byte_idx = 0; - tcp->outgoing_buffer_arg = arg; if (arg) { GPR_ASSERT(grpc_event_engine_can_track_errors()); } @@ -999,6 +1030,22 @@ static grpc_resource_user* tcp_get_resource_user(grpc_endpoint* ep) { return tcp->resource_user; } +static bool tcp_can_track_err(grpc_endpoint* ep) { + grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); + if (!grpc_event_engine_can_track_errors()) { + return false; + } + struct sockaddr addr; + socklen_t len = sizeof(addr); + if (getsockname(tcp->fd, &addr, &len) < 0) { + return false; + } + if (addr.sa_family == AF_INET || addr.sa_family == AF_INET6) { + return true; + } + return false; +} + static const grpc_endpoint_vtable vtable = {tcp_read, tcp_write, tcp_add_to_pollset, @@ -1008,7 +1055,8 @@ static const grpc_endpoint_vtable vtable = {tcp_read, tcp_destroy, tcp_get_resource_user, tcp_get_peer, - tcp_get_fd}; + tcp_get_fd, + tcp_can_track_err}; #define MAX_CHUNK_SIZE 32 * 1024 * 1024 @@ -1069,6 +1117,7 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->is_first_read = true; tcp->bytes_counter = -1; tcp->socket_ts_enabled = false; + tcp->outgoing_buffer_arg = nullptr; /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); @@ -1113,6 +1162,12 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd, grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); if (grpc_event_engine_can_track_errors()) { /* Stop errors notification. */ + gpr_mu_lock(&tcp->tb_mu); + grpc_core::TracedBuffer::Shutdown( + &tcp->tb_head, tcp->outgoing_buffer_arg, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("endpoint destroyed")); + gpr_mu_unlock(&tcp->tb_mu); + tcp->outgoing_buffer_arg = nullptr; gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); grpc_fd_set_error(tcp->em_fd); } diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc index 64c4a56ae9..86ee1010cf 100644 --- a/src/core/lib/iomgr/tcp_windows.cc +++ b/src/core/lib/iomgr/tcp_windows.cc @@ -42,6 +42,7 @@ #include "src/core/lib/iomgr/tcp_windows.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" #if defined(__MSYS__) && defined(GPR_ARCH_64) /* Nasty workaround for nasty bug when using the 64 bits msys compiler @@ -112,7 +113,10 @@ typedef struct grpc_tcp { grpc_closure* read_cb; grpc_closure* write_cb; - grpc_slice read_slice; + + /* garbage after the last read */ + grpc_slice_buffer last_read_buffer; + grpc_slice_buffer* write_slices; grpc_slice_buffer* read_slices; @@ -131,6 +135,7 @@ static void tcp_free(grpc_tcp* tcp) { grpc_winsocket_destroy(tcp->socket); gpr_mu_destroy(&tcp->mu); gpr_free(tcp->peer_string); + grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer); grpc_resource_user_unref(tcp->resource_user); if (tcp->shutting_down) GRPC_ERROR_UNREF(tcp->shutdown_error); gpr_free(tcp); @@ -179,9 +184,12 @@ static void on_read(void* tcpp, grpc_error* error) { grpc_tcp* tcp = (grpc_tcp*)tcpp; grpc_closure* cb = tcp->read_cb; grpc_winsocket* socket = tcp->socket; - grpc_slice sub; grpc_winsocket_callback_info* info = &socket->read_info; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, "TCP:%p on_read", tcp); + } + GRPC_ERROR_REF(error); if (error == GRPC_ERROR_NONE) { @@ -189,13 +197,35 @@ static void on_read(void* tcpp, grpc_error* error) { char* utf8_message = gpr_format_message(info->wsa_error); error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(utf8_message); gpr_free(utf8_message); - grpc_slice_unref_internal(tcp->read_slice); + grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); } else { if (info->bytes_transfered != 0 && !tcp->shutting_down) { - sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered); - grpc_slice_buffer_add(tcp->read_slices, sub); + GPR_ASSERT((size_t)info->bytes_transfered <= tcp->read_slices->length); + if (static_cast<size_t>(info->bytes_transfered) != + tcp->read_slices->length) { + grpc_slice_buffer_trim_end( + tcp->read_slices, + tcp->read_slices->length - + static_cast<size_t>(info->bytes_transfered), + &tcp->last_read_buffer); + } + GPR_ASSERT((size_t)info->bytes_transfered == tcp->read_slices->length); + + if (grpc_tcp_trace.enabled()) { + size_t i; + for (i = 0; i < tcp->read_slices->count; i++) { + char* dump = grpc_dump_slice(tcp->read_slices->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_INFO, "READ %p (peer=%s): %s", tcp, tcp->peer_string, + dump); + gpr_free(dump); + } + } } else { - grpc_slice_unref_internal(tcp->read_slice); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, "TCP:%p unref read_slice", tcp); + } + grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); error = tcp->shutting_down ? GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "TCP stream shutting down", &tcp->shutdown_error, 1) @@ -209,6 +239,8 @@ static void on_read(void* tcpp, grpc_error* error) { GRPC_CLOSURE_SCHED(cb, error); } +#define DEFAULT_TARGET_READ_SIZE 8192 +#define MAX_WSABUF_COUNT 16 static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, grpc_closure* cb) { grpc_tcp* tcp = (grpc_tcp*)ep; @@ -217,7 +249,12 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, int status; DWORD bytes_read = 0; DWORD flags = 0; - WSABUF buffer; + WSABUF buffers[MAX_WSABUF_COUNT]; + size_t i; + + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, "TCP:%p win_read", tcp); + } if (tcp->shutting_down) { GRPC_CLOSURE_SCHED( @@ -229,18 +266,27 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, tcp->read_cb = cb; tcp->read_slices = read_slices; grpc_slice_buffer_reset_and_unref_internal(read_slices); + grpc_slice_buffer_swap(read_slices, &tcp->last_read_buffer); - tcp->read_slice = GRPC_SLICE_MALLOC(8192); + if (tcp->read_slices->length < DEFAULT_TARGET_READ_SIZE / 2 && + tcp->read_slices->count < MAX_WSABUF_COUNT) { + // TODO(jtattermusch): slice should be allocated using resource quota + grpc_slice_buffer_add(tcp->read_slices, + GRPC_SLICE_MALLOC(DEFAULT_TARGET_READ_SIZE)); + } - buffer.len = (ULONG)GRPC_SLICE_LENGTH( - tcp->read_slice); // we know slice size fits in 32bit. - buffer.buf = (char*)GRPC_SLICE_START_PTR(tcp->read_slice); + GPR_ASSERT(tcp->read_slices->count <= MAX_WSABUF_COUNT); + for (i = 0; i < tcp->read_slices->count; i++) { + buffers[i].len = (ULONG)GRPC_SLICE_LENGTH( + tcp->read_slices->slices[i]); // we know slice size fits in 32bit. + buffers[i].buf = (char*)GRPC_SLICE_START_PTR(tcp->read_slices->slices[i]); + } TCP_REF(tcp, "read"); /* First let's try a synchronous, non-blocking read. */ - status = - WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, NULL, NULL); + status = WSARecv(tcp->socket->socket, buffers, (DWORD)tcp->read_slices->count, + &bytes_read, &flags, NULL, NULL); info->wsa_error = status == 0 ? 0 : WSAGetLastError(); /* Did we get data immediately ? Yay. */ @@ -252,8 +298,8 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, /* Otherwise, let's retry, by queuing a read. */ memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED)); - status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, - &info->overlapped, NULL); + status = WSARecv(tcp->socket->socket, buffers, (DWORD)tcp->read_slices->count, + &bytes_read, &flags, &info->overlapped, NULL); if (status != 0) { int wsa_error = WSAGetLastError(); @@ -275,6 +321,10 @@ static void on_write(void* tcpp, grpc_error* error) { grpc_winsocket_callback_info* info = &handle->write_info; grpc_closure* cb; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, "TCP:%p on_write", tcp); + } + GRPC_ERROR_REF(error); gpr_mu_lock(&tcp->mu); @@ -303,11 +353,21 @@ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices, unsigned i; DWORD bytes_sent; int status; - WSABUF local_buffers[16]; + WSABUF local_buffers[MAX_WSABUF_COUNT]; WSABUF* allocated = NULL; WSABUF* buffers = local_buffers; size_t len; + if (grpc_tcp_trace.enabled()) { + size_t i; + for (i = 0; i < slices->count; i++) { + char* data = + grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_INFO, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data); + gpr_free(data); + } + } + if (tcp->shutting_down) { GRPC_CLOSURE_SCHED( cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( @@ -412,6 +472,7 @@ static void win_shutdown(grpc_endpoint* ep, grpc_error* why) { static void win_destroy(grpc_endpoint* ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp* tcp = (grpc_tcp*)ep; + grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); TCP_UNREF(tcp, "destroy"); } @@ -427,6 +488,8 @@ static grpc_resource_user* win_get_resource_user(grpc_endpoint* ep) { static int win_get_fd(grpc_endpoint* ep) { return -1; } +static bool win_can_track_err(grpc_endpoint* ep) { return false; } + static grpc_endpoint_vtable vtable = {win_read, win_write, win_add_to_pollset, @@ -436,7 +499,8 @@ static grpc_endpoint_vtable vtable = {win_read, win_destroy, win_get_resource_user, win_get_peer, - win_get_fd}; + win_get_fd, + win_can_track_err}; grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket, grpc_channel_args* channel_args, @@ -460,6 +524,7 @@ grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket, GRPC_CLOSURE_INIT(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx); tcp->peer_string = gpr_strdup(peer_string); + grpc_slice_buffer_init(&tcp->last_read_buffer); tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); |