aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
authorGravatar ncteisen <ncteisen@gmail.com>2018-08-28 12:55:40 -0700
committerGravatar ncteisen <ncteisen@gmail.com>2018-08-28 12:55:40 -0700
commit6076b1d7982b4d0778dd68c236075b5c36b72e0d (patch)
tree1da2c80065a6844ca0480bd83217792cc3185886 /src/core/lib
parent3f002567c4c1f8cc7542aeee2d60d6d0c4c5dd54 (diff)
parent4b5b019d5644affef122e06c6898811286850b8d (diff)
Merge branch 'channelz-subchannels' into channelz-server
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/channel/channelz_registry.cc7
-rw-r--r--src/core/lib/channel/connected_channel.cc4
-rw-r--r--src/core/lib/channel/connected_channel.h4
-rw-r--r--src/core/lib/gpr/arena.cc77
-rw-r--r--src/core/lib/gprpp/fork.cc74
-rw-r--r--src/core/lib/gprpp/fork.h17
-rw-r--r--src/core/lib/gprpp/mutex_lock.h42
-rw-r--r--src/core/lib/http/httpcli.cc2
-rw-r--r--src/core/lib/iomgr/buffer_list.cc134
-rw-r--r--src/core/lib/iomgr/buffer_list.h96
-rw-r--r--src/core/lib/iomgr/endpoint.cc4
-rw-r--r--src/core/lib/iomgr/endpoint.h8
-rw-r--r--src/core/lib/iomgr/endpoint_cfstream.cc2
-rw-r--r--src/core/lib/iomgr/endpoint_pair_posix.cc4
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc72
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc3
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc114
-rw-r--r--src/core/lib/iomgr/ev_posix.cc70
-rw-r--r--src/core/lib/iomgr/ev_posix.h10
-rw-r--r--src/core/lib/iomgr/exec_ctx.cc6
-rw-r--r--src/core/lib/iomgr/exec_ctx.h2
-rw-r--r--src/core/lib/iomgr/fork_posix.cc13
-rw-r--r--src/core/lib/iomgr/internal_errqueue.cc36
-rw-r--r--src/core/lib/iomgr/internal_errqueue.h83
-rw-r--r--src/core/lib/iomgr/port.h17
-rw-r--r--src/core/lib/iomgr/resource_quota.cc78
-rw-r--r--src/core/lib/iomgr/resource_quota.h16
-rw-r--r--src/core/lib/iomgr/socket_mutator.cc2
-rw-r--r--src/core/lib/iomgr/socket_mutator.h2
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.cc2
-rw-r--r--src/core/lib/iomgr/tcp_custom.cc2
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc312
-rw-r--r--src/core/lib/iomgr/tcp_posix.h3
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.cc4
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_common.cc2
-rw-r--r--src/core/lib/iomgr/tcp_windows.cc2
-rw-r--r--src/core/lib/iomgr/timer.h5
-rw-r--r--src/core/lib/iomgr/udp_server.cc2
-rw-r--r--src/core/lib/security/credentials/jwt/json_token.h2
-rw-r--r--src/core/lib/security/credentials/jwt/jwt_verifier.cc2
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.cc2
-rw-r--r--src/core/lib/security/transport/secure_endpoint.cc4
-rw-r--r--src/core/lib/security/transport/security_handshaker.cc2
-rw-r--r--src/core/lib/surface/call.cc2
-rw-r--r--src/core/lib/surface/channel_init.h28
-rw-r--r--src/core/lib/surface/completion_queue.cc167
-rw-r--r--src/core/lib/surface/completion_queue.h21
-rw-r--r--src/core/lib/surface/completion_queue_factory.cc17
-rw-r--r--src/core/lib/surface/init.cc26
-rw-r--r--src/core/lib/surface/init.h1
-rw-r--r--src/core/lib/surface/init_secure.cc11
-rw-r--r--src/core/lib/surface/version.cc2
-rw-r--r--src/core/lib/transport/service_config.cc4
-rw-r--r--src/core/lib/transport/service_config.h6
54 files changed, 1405 insertions, 225 deletions
diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc
index 285f641ba9..adc7b6ba44 100644
--- a/src/core/lib/channel/channelz_registry.cc
+++ b/src/core/lib/channel/channelz_registry.cc
@@ -23,6 +23,7 @@
#include "src/core/lib/channel/channelz_registry.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/gprpp/mutex_lock.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -53,7 +54,7 @@ ChannelzRegistry::ChannelzRegistry() { gpr_mu_init(&mu_); }
ChannelzRegistry::~ChannelzRegistry() { gpr_mu_destroy(&mu_); }
intptr_t ChannelzRegistry::InternalRegister(BaseNode* node) {
- mu_guard guard(&mu_);
+ MutexLock lock(&mu_);
entities_.push_back(node);
intptr_t uuid = entities_.size();
return uuid;
@@ -61,13 +62,13 @@ intptr_t ChannelzRegistry::InternalRegister(BaseNode* node) {
void ChannelzRegistry::InternalUnregister(intptr_t uuid) {
GPR_ASSERT(uuid >= 1);
- mu_guard guard(&mu_);
+ MutexLock lock(&mu_);
GPR_ASSERT(static_cast<size_t>(uuid) <= entities_.size());
entities_[uuid - 1] = nullptr;
}
BaseNode* ChannelzRegistry::InternalGet(intptr_t uuid) {
- mu_guard guard(&mu_);
+ MutexLock lock(&mu_);
if (uuid < 1 || uuid > static_cast<intptr_t>(entities_.size())) {
return nullptr;
}
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc
index 45be90af5f..90a0254663 100644
--- a/src/core/lib/channel/connected_channel.cc
+++ b/src/core/lib/channel/connected_channel.cc
@@ -230,8 +230,8 @@ static void bind_transport(grpc_channel_stack* channel_stack,
grpc_transport_stream_size(static_cast<grpc_transport*>(t));
}
-bool grpc_append_connected_filter(grpc_channel_stack_builder* builder,
- void* arg_must_be_null) {
+bool grpc_add_connected_filter(grpc_channel_stack_builder* builder,
+ void* arg_must_be_null) {
GPR_ASSERT(arg_must_be_null == nullptr);
grpc_transport* t = grpc_channel_stack_builder_get_transport(builder);
GPR_ASSERT(t != nullptr);
diff --git a/src/core/lib/channel/connected_channel.h b/src/core/lib/channel/connected_channel.h
index 280daf040d..faa1c73a21 100644
--- a/src/core/lib/channel/connected_channel.h
+++ b/src/core/lib/channel/connected_channel.h
@@ -25,8 +25,8 @@
extern const grpc_channel_filter grpc_connected_filter;
-bool grpc_append_connected_filter(grpc_channel_stack_builder* builder,
- void* arg_must_be_null);
+bool grpc_add_connected_filter(grpc_channel_stack_builder* builder,
+ void* arg_must_be_null);
/* Debug helper to dig the transport stream out of a call element */
grpc_stream* grpc_connected_channel_get_stream(grpc_call_element* elem);
diff --git a/src/core/lib/gpr/arena.cc b/src/core/lib/gpr/arena.cc
index e30b297aea..77f9357146 100644
--- a/src/core/lib/gpr/arena.cc
+++ b/src/core/lib/gpr/arena.cc
@@ -77,16 +77,16 @@ void* gpr_arena_alloc(gpr_arena* arena, size_t size) {
// would allow us to use the alignment actually needed by the caller.
typedef struct zone {
- size_t size_begin; // All the space we have set aside for allocations up
- // until this zone.
- size_t size_end; // size_end = size_begin plus all the space we set aside for
- // allocations in zone z itself.
zone* next;
} zone;
struct gpr_arena {
- gpr_atm size_so_far;
+ // Keep track of the total used size. We use this in our call sizing
+ // historesis.
+ gpr_atm total_used;
+ size_t initial_zone_size;
zone initial_zone;
+ zone* last_zone;
gpr_mu arena_growth_mutex;
};
@@ -100,14 +100,15 @@ gpr_arena* gpr_arena_create(size_t initial_size) {
initial_size = GPR_ROUND_UP_TO_ALIGNMENT_SIZE(initial_size);
gpr_arena* a = static_cast<gpr_arena*>(zalloc_aligned(
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(gpr_arena)) + initial_size));
- a->initial_zone.size_end = initial_size;
+ a->initial_zone_size = initial_size;
+ a->last_zone = &a->initial_zone;
gpr_mu_init(&a->arena_growth_mutex);
return a;
}
size_t gpr_arena_destroy(gpr_arena* arena) {
gpr_mu_destroy(&arena->arena_growth_mutex);
- gpr_atm size = gpr_atm_no_barrier_load(&arena->size_so_far);
+ gpr_atm size = gpr_atm_no_barrier_load(&arena->total_used);
zone* z = arena->initial_zone.next;
gpr_free_aligned(arena);
while (z) {
@@ -120,55 +121,25 @@ size_t gpr_arena_destroy(gpr_arena* arena) {
void* gpr_arena_alloc(gpr_arena* arena, size_t size) {
size = GPR_ROUND_UP_TO_ALIGNMENT_SIZE(size);
- size_t previous_size_of_arena_allocations = static_cast<size_t>(
- gpr_atm_no_barrier_fetch_add(&arena->size_so_far, size));
- size_t updated_size_of_arena_allocations =
- previous_size_of_arena_allocations + size;
- zone* z = &arena->initial_zone;
- // Check to see if the allocation isn't able to end in the initial zone.
- // This statement is true only in the uncommon case because of our arena
- // sizing historesis (that is, most calls should have a large enough initial
- // zone and will not need to grow the arena).
- if (updated_size_of_arena_allocations > z->size_end) {
- // Find a zone to fit this allocation
+ size_t begin = gpr_atm_no_barrier_fetch_add(&arena->total_used, size);
+ if (begin + size <= arena->initial_zone_size) {
+ return reinterpret_cast<char*>(arena) +
+ GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(gpr_arena)) + begin;
+ } else {
+ // If the allocation isn't able to end in the initial zone, create a new
+ // zone for this allocation, and any unused space in the initial zone is
+ // wasted. This overflowing and wasting is uncommon because of our arena
+ // sizing historesis (that is, most calls should have a large enough initial
+ // zone and will not need to grow the arena).
gpr_mu_lock(&arena->arena_growth_mutex);
- while (updated_size_of_arena_allocations > z->size_end) {
- if (z->next == nullptr) {
- // Note that we do an extra increment of size_so_far to prevent multiple
- // simultaneous callers from stepping on each other. However, this extra
- // increment means some space in the arena is wasted.
- // So whenever we need to allocate x bytes and there are x - n (where
- // n > 0) remaining in the current zone, we will waste x bytes (x - n
- // in the current zone and n in the new zone).
- previous_size_of_arena_allocations = static_cast<size_t>(
- gpr_atm_no_barrier_fetch_add(&arena->size_so_far, size));
- updated_size_of_arena_allocations =
- previous_size_of_arena_allocations + size;
- size_t next_z_size = updated_size_of_arena_allocations;
- z->next = static_cast<zone*>(zalloc_aligned(
- GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone)) + next_z_size));
- z->next->size_begin = z->size_end;
- z->next->size_end = z->size_end + next_z_size;
- }
- z = z->next;
- }
+ zone* z = static_cast<zone*>(
+ zalloc_aligned(GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone)) + size));
+ arena->last_zone->next = z;
+ arena->last_zone = z;
gpr_mu_unlock(&arena->arena_growth_mutex);
+ return reinterpret_cast<char*>(z) +
+ GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone));
}
- GPR_ASSERT(previous_size_of_arena_allocations >= z->size_begin);
- GPR_ASSERT(updated_size_of_arena_allocations <= z->size_end);
- // Skip the first part of the zone, which just contains tracking information.
- // For the initial zone, this is the gpr_arena struct and for any other zone,
- // it's the zone struct.
- char* start_of_allocation_space =
- (z == &arena->initial_zone)
- ? reinterpret_cast<char*>(arena) +
- GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(gpr_arena))
- : reinterpret_cast<char*>(z) +
- GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone));
- // previous_size_of_arena_allocations - size_begin is how many bytes have been
- // allocated into the current zone
- return start_of_allocation_space + previous_size_of_arena_allocations -
- z->size_begin;
}
#endif // SIMPLE_ARENA_FOR_DEBUGGING
diff --git a/src/core/lib/gprpp/fork.cc b/src/core/lib/gprpp/fork.cc
index f6d9a87d2c..3b9c16510a 100644
--- a/src/core/lib/gprpp/fork.cc
+++ b/src/core/lib/gprpp/fork.cc
@@ -157,11 +157,11 @@ class ThreadState {
} // namespace
void Fork::GlobalInit() {
- if (!overrideEnabled_) {
+ if (!override_enabled_) {
#ifdef GRPC_ENABLE_FORK_SUPPORT
- supportEnabled_ = true;
+ support_enabled_ = true;
#else
- supportEnabled_ = false;
+ support_enabled_ = false;
#endif
bool env_var_set = false;
char* env = gpr_getenv("GRPC_ENABLE_FORK_SUPPORT");
@@ -172,7 +172,7 @@ void Fork::GlobalInit() {
"False", "FALSE", "0"};
for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
if (0 == strcmp(env, truthy[i])) {
- supportEnabled_ = true;
+ support_enabled_ = true;
env_var_set = true;
break;
}
@@ -180,7 +180,7 @@ void Fork::GlobalInit() {
if (!env_var_set) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(falsey); i++) {
if (0 == strcmp(env, falsey[i])) {
- supportEnabled_ = false;
+ support_enabled_ = false;
env_var_set = true;
break;
}
@@ -189,72 +189,80 @@ void Fork::GlobalInit() {
gpr_free(env);
}
}
- if (supportEnabled_) {
- execCtxState_ = grpc_core::New<internal::ExecCtxState>();
- threadState_ = grpc_core::New<internal::ThreadState>();
+ if (support_enabled_) {
+ exec_ctx_state_ = grpc_core::New<internal::ExecCtxState>();
+ thread_state_ = grpc_core::New<internal::ThreadState>();
}
}
void Fork::GlobalShutdown() {
- if (supportEnabled_) {
- grpc_core::Delete(execCtxState_);
- grpc_core::Delete(threadState_);
+ if (support_enabled_) {
+ grpc_core::Delete(exec_ctx_state_);
+ grpc_core::Delete(thread_state_);
}
}
-bool Fork::Enabled() { return supportEnabled_; }
+bool Fork::Enabled() { return support_enabled_; }
// Testing Only
void Fork::Enable(bool enable) {
- overrideEnabled_ = true;
- supportEnabled_ = enable;
+ override_enabled_ = true;
+ support_enabled_ = enable;
}
void Fork::IncExecCtxCount() {
- if (supportEnabled_) {
- execCtxState_->IncExecCtxCount();
+ if (support_enabled_) {
+ exec_ctx_state_->IncExecCtxCount();
}
}
void Fork::DecExecCtxCount() {
- if (supportEnabled_) {
- execCtxState_->DecExecCtxCount();
+ if (support_enabled_) {
+ exec_ctx_state_->DecExecCtxCount();
}
}
+void Fork::SetResetChildPollingEngineFunc(
+ Fork::child_postfork_func reset_child_polling_engine) {
+ reset_child_polling_engine_ = reset_child_polling_engine;
+}
+Fork::child_postfork_func Fork::GetResetChildPollingEngineFunc() {
+ return reset_child_polling_engine_;
+}
+
bool Fork::BlockExecCtx() {
- if (supportEnabled_) {
- return execCtxState_->BlockExecCtx();
+ if (support_enabled_) {
+ return exec_ctx_state_->BlockExecCtx();
}
return false;
}
void Fork::AllowExecCtx() {
- if (supportEnabled_) {
- execCtxState_->AllowExecCtx();
+ if (support_enabled_) {
+ exec_ctx_state_->AllowExecCtx();
}
}
void Fork::IncThreadCount() {
- if (supportEnabled_) {
- threadState_->IncThreadCount();
+ if (support_enabled_) {
+ thread_state_->IncThreadCount();
}
}
void Fork::DecThreadCount() {
- if (supportEnabled_) {
- threadState_->DecThreadCount();
+ if (support_enabled_) {
+ thread_state_->DecThreadCount();
}
}
void Fork::AwaitThreads() {
- if (supportEnabled_) {
- threadState_->AwaitThreads();
+ if (support_enabled_) {
+ thread_state_->AwaitThreads();
}
}
-internal::ExecCtxState* Fork::execCtxState_ = nullptr;
-internal::ThreadState* Fork::threadState_ = nullptr;
-bool Fork::supportEnabled_ = false;
-bool Fork::overrideEnabled_ = false;
-
+internal::ExecCtxState* Fork::exec_ctx_state_ = nullptr;
+internal::ThreadState* Fork::thread_state_ = nullptr;
+bool Fork::support_enabled_ = false;
+bool Fork::override_enabled_ = false;
+Fork::child_postfork_func Fork::reset_child_polling_engine_ = nullptr;
} // namespace grpc_core
diff --git a/src/core/lib/gprpp/fork.h b/src/core/lib/gprpp/fork.h
index 123e22c4c6..5a7404f0d9 100644
--- a/src/core/lib/gprpp/fork.h
+++ b/src/core/lib/gprpp/fork.h
@@ -33,6 +33,8 @@ class ThreadState;
class Fork {
public:
+ typedef void (*child_postfork_func)(void);
+
static void GlobalInit();
static void GlobalShutdown();
@@ -46,6 +48,12 @@ class Fork {
// Decrement the count of active ExecCtxs
static void DecExecCtxCount();
+ // Provide a function that will be invoked in the child's postfork handler to
+ // reset the polling engine's internal state.
+ static void SetResetChildPollingEngineFunc(
+ child_postfork_func reset_child_polling_engine);
+ static child_postfork_func GetResetChildPollingEngineFunc();
+
// Check if there is a single active ExecCtx
// (the one used to invoke this function). If there are more,
// return false. Otherwise, return true and block creation of
@@ -68,10 +76,11 @@ class Fork {
static void Enable(bool enable);
private:
- static internal::ExecCtxState* execCtxState_;
- static internal::ThreadState* threadState_;
- static bool supportEnabled_;
- static bool overrideEnabled_;
+ static internal::ExecCtxState* exec_ctx_state_;
+ static internal::ThreadState* thread_state_;
+ static bool support_enabled_;
+ static bool override_enabled_;
+ static child_postfork_func reset_child_polling_engine_;
};
} // namespace grpc_core
diff --git a/src/core/lib/gprpp/mutex_lock.h b/src/core/lib/gprpp/mutex_lock.h
new file mode 100644
index 0000000000..54751d5fe4
--- /dev/null
+++ b/src/core/lib/gprpp/mutex_lock.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_GPRPP_MUTEX_LOCK_H
+#define GRPC_CORE_LIB_GPRPP_MUTEX_LOCK_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpc/support/sync.h>
+
+namespace grpc_core {
+
+class MutexLock {
+ public:
+ explicit MutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu); }
+ ~MutexLock() { gpr_mu_unlock(mu_); }
+
+ MutexLock(const MutexLock&) = delete;
+ MutexLock& operator=(const MutexLock&) = delete;
+
+ private:
+ gpr_mu* const mu_;
+};
+
+} // namespace grpc_core
+
+#endif /* GRPC_CORE_LIB_GPRPP_MUTEX_LOCK_H */
diff --git a/src/core/lib/http/httpcli.cc b/src/core/lib/http/httpcli.cc
index 12060074c5..3bd7a2ce59 100644
--- a/src/core/lib/http/httpcli.cc
+++ b/src/core/lib/http/httpcli.cc
@@ -163,7 +163,7 @@ static void done_write(void* arg, grpc_error* error) {
static void start_write(internal_request* req) {
grpc_slice_ref_internal(req->request_text);
grpc_slice_buffer_add(&req->outgoing, req->request_text);
- grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write);
+ grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write, nullptr);
}
static void on_handshake_done(void* arg, grpc_endpoint* ep) {
diff --git a/src/core/lib/iomgr/buffer_list.cc b/src/core/lib/iomgr/buffer_list.cc
new file mode 100644
index 0000000000..6ada23db1c
--- /dev/null
+++ b/src/core/lib/iomgr/buffer_list.cc
@@ -0,0 +1,134 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/buffer_list.h"
+#include "src/core/lib/iomgr/port.h"
+
+#include <grpc/support/log.h>
+
+#ifdef GRPC_LINUX_ERRQUEUE
+#include <time.h>
+
+#include "src/core/lib/gprpp/memory.h"
+
+namespace grpc_core {
+void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no,
+ void* arg) {
+ GPR_DEBUG_ASSERT(head != nullptr);
+ TracedBuffer* new_elem = New<TracedBuffer>(seq_no, arg);
+ /* Store the current time as the sendmsg time. */
+ new_elem->ts_.sendmsg_time = gpr_now(GPR_CLOCK_REALTIME);
+ if (*head == nullptr) {
+ *head = new_elem;
+ return;
+ }
+ /* Append at the end. */
+ TracedBuffer* ptr = *head;
+ while (ptr->next_ != nullptr) {
+ ptr = ptr->next_;
+ }
+ ptr->next_ = new_elem;
+}
+
+namespace {
+/** Fills gpr_timespec gts based on values from timespec ts */
+void fill_gpr_from_timestamp(gpr_timespec* gts, const struct timespec* ts) {
+ gts->tv_sec = ts->tv_sec;
+ gts->tv_nsec = static_cast<int32_t>(ts->tv_nsec);
+ gts->clock_type = GPR_CLOCK_REALTIME;
+}
+
+/** The saved callback function that will be invoked when we get all the
+ * timestamps that we are going to get for a TracedBuffer. */
+void (*timestamps_callback)(void*, grpc_core::Timestamps*,
+ grpc_error* shutdown_err);
+} /* namespace */
+
+void TracedBuffer::ProcessTimestamp(TracedBuffer** head,
+ struct sock_extended_err* serr,
+ struct scm_timestamping* tss) {
+ GPR_DEBUG_ASSERT(head != nullptr);
+ TracedBuffer* elem = *head;
+ TracedBuffer* next = nullptr;
+ while (elem != nullptr) {
+ /* The byte number refers to the sequence number of the last byte which this
+ * timestamp relates to. */
+ if (serr->ee_data >= elem->seq_no_) {
+ switch (serr->ee_info) {
+ case SCM_TSTAMP_SCHED:
+ fill_gpr_from_timestamp(&(elem->ts_.scheduled_time), &(tss->ts[0]));
+ elem = elem->next_;
+ break;
+ case SCM_TSTAMP_SND:
+ fill_gpr_from_timestamp(&(elem->ts_.sent_time), &(tss->ts[0]));
+ elem = elem->next_;
+ break;
+ case SCM_TSTAMP_ACK:
+ fill_gpr_from_timestamp(&(elem->ts_.acked_time), &(tss->ts[0]));
+ /* Got all timestamps. Do the callback and free this TracedBuffer.
+ * The thing below can be passed by value if we don't want the
+ * restriction on the lifetime. */
+ timestamps_callback(elem->arg_, &(elem->ts_), GRPC_ERROR_NONE);
+ next = elem->next_;
+ Delete<TracedBuffer>(elem);
+ *head = elem = next;
+ break;
+ default:
+ abort();
+ }
+ } else {
+ break;
+ }
+ }
+}
+
+void TracedBuffer::Shutdown(TracedBuffer** head, grpc_error* shutdown_err) {
+ GPR_DEBUG_ASSERT(head != nullptr);
+ TracedBuffer* elem = *head;
+ while (elem != nullptr) {
+ if (timestamps_callback) {
+ timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err);
+ }
+ auto* next = elem->next_;
+ Delete<TracedBuffer>(elem);
+ elem = next;
+ }
+ *head = nullptr;
+ GRPC_ERROR_UNREF(shutdown_err);
+}
+
+void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*,
+ grpc_core::Timestamps*,
+ grpc_error* error)) {
+ timestamps_callback = fn;
+}
+} /* namespace grpc_core */
+
+#else /* GRPC_LINUX_ERRQUEUE */
+
+namespace grpc_core {
+void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*,
+ grpc_core::Timestamps*,
+ grpc_error* error)) {
+ gpr_log(GPR_DEBUG, "Timestamps callback is not enabled for this platform");
+}
+} /* namespace grpc_core */
+
+#endif /* GRPC_LINUX_ERRQUEUE */
diff --git a/src/core/lib/iomgr/buffer_list.h b/src/core/lib/iomgr/buffer_list.h
new file mode 100644
index 0000000000..cbbf50a657
--- /dev/null
+++ b/src/core/lib/iomgr/buffer_list.h
@@ -0,0 +1,96 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H
+#define GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#include <grpc/support/time.h>
+
+#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/internal_errqueue.h"
+
+namespace grpc_core {
+struct Timestamps {
+ /* TODO(yashykt): This would also need to store OPTSTAT once support is added
+ */
+ gpr_timespec sendmsg_time;
+ gpr_timespec scheduled_time;
+ gpr_timespec sent_time;
+ gpr_timespec acked_time;
+};
+
+/** TracedBuffer is a class to keep track of timestamps for a specific buffer in
+ * the TCP layer. We are only tracking timestamps for Linux kernels and hence
+ * this class would only be used by Linux platforms. For all other platforms,
+ * TracedBuffer would be an empty class.
+ *
+ * The timestamps collected are according to grpc_core::Timestamps declared
+ * above.
+ *
+ * A TracedBuffer list is kept track of using the head element of the list. If
+ * the head element of the list is nullptr, then the list is empty.
+ */
+#ifdef GRPC_LINUX_ERRQUEUE
+class TracedBuffer {
+ public:
+ /** Add a new entry in the TracedBuffer list pointed to by head. Also saves
+ * sendmsg_time with the current timestamp. */
+ static void AddNewEntry(grpc_core::TracedBuffer** head, uint32_t seq_no,
+ void* arg);
+
+ /** Processes a received timestamp based on sock_extended_err and
+ * scm_timestamping structures. It will invoke the timestamps callback if the
+ * timestamp type is SCM_TSTAMP_ACK. */
+ static void ProcessTimestamp(grpc_core::TracedBuffer** head,
+ struct sock_extended_err* serr,
+ struct scm_timestamping* tss);
+
+ /** Cleans the list by calling the callback for each traced buffer in the list
+ * with timestamps that it has. */
+ static void Shutdown(grpc_core::TracedBuffer** head,
+ grpc_error* shutdown_err);
+
+ private:
+ GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
+
+ TracedBuffer(int seq_no, void* arg)
+ : seq_no_(seq_no), arg_(arg), next_(nullptr) {}
+
+ uint32_t seq_no_; /* The sequence number for the last byte in the buffer */
+ void* arg_; /* The arg to pass to timestamps_callback */
+ grpc_core::Timestamps ts_; /* The timestamps corresponding to this buffer */
+ grpc_core::TracedBuffer* next_; /* The next TracedBuffer in the list */
+};
+#else /* GRPC_LINUX_ERRQUEUE */
+class TracedBuffer {};
+#endif /* GRPC_LINUX_ERRQUEUE */
+
+/** Sets the callback function to call when timestamps for a write are
+ * collected. The callback does not own a reference to error. */
+void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*,
+ grpc_core::Timestamps*,
+ grpc_error* error));
+
+}; /* namespace grpc_core */
+
+#endif /* GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H */
diff --git a/src/core/lib/iomgr/endpoint.cc b/src/core/lib/iomgr/endpoint.cc
index 92e7930111..44fb47e19d 100644
--- a/src/core/lib/iomgr/endpoint.cc
+++ b/src/core/lib/iomgr/endpoint.cc
@@ -28,8 +28,8 @@ void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
- ep->vtable->write(ep, slices, cb);
+ grpc_closure* cb, void* arg) {
+ ep->vtable->write(ep, slices, cb, arg);
}
void grpc_endpoint_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {
diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h
index 15db1649fa..1f590a80ca 100644
--- a/src/core/lib/iomgr/endpoint.h
+++ b/src/core/lib/iomgr/endpoint.h
@@ -33,10 +33,12 @@
typedef struct grpc_endpoint grpc_endpoint;
typedef struct grpc_endpoint_vtable grpc_endpoint_vtable;
+class Timestamps;
struct grpc_endpoint_vtable {
void (*read)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb);
- void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb);
+ void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb,
+ void* arg);
void (*add_to_pollset)(grpc_endpoint* ep, grpc_pollset* pollset);
void (*add_to_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset);
void (*delete_from_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset);
@@ -70,9 +72,11 @@ int grpc_endpoint_get_fd(grpc_endpoint* ep);
\a slices may be mutated at will by the endpoint until cb is called.
No guarantee is made to the content of slices after a write EXCEPT that
it is a valid slice buffer.
+ \a arg is platform specific. It is currently only used by TCP on linux
+ platforms as an argument that would be forwarded to the timestamps callback.
*/
void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb);
+ grpc_closure* cb, void* arg);
/* Causes any pending and future read/write callbacks to run immediately with
success==0 */
diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc
index c3bc0cc8fd..df2cf508c8 100644
--- a/src/core/lib/iomgr/endpoint_cfstream.cc
+++ b/src/core/lib/iomgr/endpoint_cfstream.cc
@@ -268,7 +268,7 @@ static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu",
diff --git a/src/core/lib/iomgr/endpoint_pair_posix.cc b/src/core/lib/iomgr/endpoint_pair_posix.cc
index 5c5c246f99..3afbfd7254 100644
--- a/src/core/lib/iomgr/endpoint_pair_posix.cc
+++ b/src/core/lib/iomgr/endpoint_pair_posix.cc
@@ -59,11 +59,11 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name,
grpc_core::ExecCtx exec_ctx;
gpr_asprintf(&final_name, "%s:client", name);
- p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, false), args,
+ p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, true), args,
"socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
- p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, false), args,
+ p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, true), args,
"socketpair-client");
gpr_free(final_name);
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index 66e0f1fd6d..aa5016bd8f 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -131,6 +131,13 @@ static void epoll_set_shutdown() {
* Fd Declarations
*/
+/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+struct grpc_fork_fd_list {
+ grpc_fd* fd;
+ grpc_fd* next;
+ grpc_fd* prev;
+};
+
struct grpc_fd {
int fd;
@@ -141,6 +148,9 @@ struct grpc_fd {
struct grpc_fd* freelist_next;
grpc_iomgr_object iomgr_object;
+
+ /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+ grpc_fork_fd_list* fork_fd_list;
};
static void fd_global_init(void);
@@ -256,6 +266,10 @@ static bool append_error(grpc_error** composite, grpc_error* error,
static grpc_fd* fd_freelist = nullptr;
static gpr_mu fd_freelist_mu;
+/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+static grpc_fd* fork_fd_list_head = nullptr;
+static gpr_mu fork_fd_list_mu;
+
static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
static void fd_global_shutdown(void) {
@@ -269,6 +283,38 @@ static void fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
+static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
+ if (grpc_core::Fork::Enabled()) {
+ gpr_mu_lock(&fork_fd_list_mu);
+ fd->fork_fd_list =
+ static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
+ fd->fork_fd_list->next = fork_fd_list_head;
+ fd->fork_fd_list->prev = nullptr;
+ if (fork_fd_list_head != nullptr) {
+ fork_fd_list_head->fork_fd_list->prev = fd;
+ }
+ fork_fd_list_head = fd;
+ gpr_mu_unlock(&fork_fd_list_mu);
+ }
+}
+
+static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) {
+ if (grpc_core::Fork::Enabled()) {
+ gpr_mu_lock(&fork_fd_list_mu);
+ if (fork_fd_list_head == fd) {
+ fork_fd_list_head = fd->fork_fd_list->next;
+ }
+ if (fd->fork_fd_list->prev != nullptr) {
+ fd->fork_fd_list->prev->fork_fd_list->next = fd->fork_fd_list->next;
+ }
+ if (fd->fork_fd_list->next != nullptr) {
+ fd->fork_fd_list->next->fork_fd_list->prev = fd->fork_fd_list->prev;
+ }
+ gpr_free(fd->fork_fd_list);
+ gpr_mu_unlock(&fork_fd_list_mu);
+ }
+}
+
static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
grpc_fd* new_fd = nullptr;
@@ -295,6 +341,7 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
char* fd_name;
gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
+ fork_fd_list_add_grpc_fd(new_fd);
#ifndef NDEBUG
if (grpc_trace_fd_refcount.enabled()) {
gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
@@ -361,6 +408,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_REF(error));
grpc_iomgr_unregister_object(&fd->iomgr_object);
+ fork_fd_list_remove_grpc_fd(fd);
fd->read_closure->DestroyEvent();
fd->write_closure->DestroyEvent();
fd->error_closure->DestroyEvent();
@@ -1190,6 +1238,10 @@ static void shutdown_engine(void) {
fd_global_shutdown();
pollset_global_shutdown();
epoll_set_shutdown();
+ if (grpc_core::Fork::Enabled()) {
+ gpr_mu_destroy(&fork_fd_list_mu);
+ grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
+ }
}
static const grpc_event_engine_vtable vtable = {
@@ -1227,6 +1279,21 @@ static const grpc_event_engine_vtable vtable = {
shutdown_engine,
};
+/* Called by the child process's post-fork handler to close open fds, including
+ * the global epoll fd. This allows gRPC to shutdown in the child process
+ * without interfering with connections or RPCs ongoing in the parent. */
+static void reset_event_manager_on_fork() {
+ gpr_mu_lock(&fork_fd_list_mu);
+ while (fork_fd_list_head != nullptr) {
+ close(fork_fd_list_head->fd);
+ fork_fd_list_head->fd = -1;
+ fork_fd_list_head = fork_fd_list_head->fork_fd_list->next;
+ }
+ gpr_mu_unlock(&fork_fd_list_mu);
+ shutdown_engine();
+ grpc_init_epoll1_linux(true);
+}
+
/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
* Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
* support is available */
@@ -1248,6 +1315,11 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
return nullptr;
}
+ if (grpc_core::Fork::Enabled()) {
+ gpr_mu_init(&fork_fd_list_mu);
+ grpc_core::Fork::SetResetChildPollingEngineFunc(
+ reset_event_manager_on_fork);
+ }
return &vtable;
}
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 96eae30345..b082634af1 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -46,6 +46,7 @@
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/manual_constructor.h"
+#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/is_epollexclusive_available.h"
@@ -735,7 +736,7 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) {
GPR_TIMER_SCOPE("kick_one_worker", 0);
pollable* p = specific_worker->pollable_obj;
- grpc_core::mu_guard lock(&p->mu);
+ grpc_core::MutexLock lock(&p->mu);
GPR_ASSERT(specific_worker != nullptr);
if (specific_worker->kicked) {
if (grpc_polling_trace.enabled()) {
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index fb4c71ef71..16562538a6 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -60,6 +60,19 @@ typedef struct grpc_fd_watcher {
grpc_fd* fd;
} grpc_fd_watcher;
+typedef struct grpc_cached_wakeup_fd grpc_cached_wakeup_fd;
+
+/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+struct grpc_fork_fd_list {
+ /* Only one of fd or cached_wakeup_fd will be set. The unused field will be
+ set to nullptr. */
+ grpc_fd* fd;
+ grpc_cached_wakeup_fd* cached_wakeup_fd;
+
+ grpc_fork_fd_list* next;
+ grpc_fork_fd_list* prev;
+};
+
struct grpc_fd {
int fd;
/* refst format:
@@ -108,8 +121,18 @@ struct grpc_fd {
grpc_closure* on_done_closure;
grpc_iomgr_object iomgr_object;
+
+ /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+ grpc_fork_fd_list* fork_fd_list;
};
+/* True when GRPC_ENABLE_FORK_SUPPORT=1. We do not support fork with poll-cv */
+static bool track_fds_for_fork = false;
+
+/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+static grpc_fork_fd_list* fork_fd_list_head = nullptr;
+static gpr_mu fork_fd_list_mu;
+
/* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read
or writability interest changes, the pollset can be kicked to pick up that
@@ -156,6 +179,9 @@ static void fd_unref(grpc_fd* fd);
typedef struct grpc_cached_wakeup_fd {
grpc_wakeup_fd fd;
struct grpc_cached_wakeup_fd* next;
+
+ /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+ grpc_fork_fd_list* fork_fd_list;
} grpc_cached_wakeup_fd;
struct grpc_pollset_worker {
@@ -281,9 +307,61 @@ poll_hash_table poll_cache;
grpc_cv_fd_table g_cvfds;
/*******************************************************************************
- * fd_posix.c
+ * functions to track opened fds. No-ops unless track_fds_for_fork is true.
*/
+static void fork_fd_list_remove_node(grpc_fork_fd_list* node) {
+ if (track_fds_for_fork) {
+ gpr_mu_lock(&fork_fd_list_mu);
+ if (fork_fd_list_head == node) {
+ fork_fd_list_head = node->next;
+ }
+ if (node->prev != nullptr) {
+ node->prev->next = node->next;
+ }
+ if (node->next != nullptr) {
+ node->next->prev = node->prev;
+ }
+ gpr_free(node);
+ gpr_mu_unlock(&fork_fd_list_mu);
+ }
+}
+
+static void fork_fd_list_add_node(grpc_fork_fd_list* node) {
+ gpr_mu_lock(&fork_fd_list_mu);
+ node->next = fork_fd_list_head;
+ node->prev = nullptr;
+ if (fork_fd_list_head != nullptr) {
+ fork_fd_list_head->prev = node;
+ }
+ fork_fd_list_head = node;
+ gpr_mu_unlock(&fork_fd_list_mu);
+}
+
+static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
+ if (track_fds_for_fork) {
+ fd->fork_fd_list =
+ static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
+ fd->fork_fd_list->fd = fd;
+ fd->fork_fd_list->cached_wakeup_fd = nullptr;
+ fork_fd_list_add_node(fd->fork_fd_list);
+ }
+}
+
+static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {
+ if (track_fds_for_fork) {
+ fd->fork_fd_list =
+ static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
+ fd->fork_fd_list->cached_wakeup_fd = fd;
+ fd->fork_fd_list->fd = nullptr;
+ fork_fd_list_add_node(fd->fork_fd_list);
+ }
+}
+
+ /*******************************************************************************
+ * fd_posix.c
+ */
+
#ifndef NDEBUG
#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
@@ -319,6 +397,7 @@ static void unref_by(grpc_fd* fd, int n) {
if (old == n) {
gpr_mu_destroy(&fd->mu);
grpc_iomgr_unregister_object(&fd->iomgr_object);
+ fork_fd_list_remove_node(fd->fork_fd_list);
if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
gpr_free(fd);
} else {
@@ -347,6 +426,7 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
gpr_asprintf(&name2, "%s fd=%d", name, fd);
grpc_iomgr_register_object(&r->iomgr_object, name2);
gpr_free(name2);
+ fork_fd_list_add_grpc_fd(r);
return r;
}
@@ -822,6 +902,7 @@ static void pollset_destroy(grpc_pollset* pollset) {
GPR_ASSERT(!pollset_has_workers(pollset));
while (pollset->local_wakeup_cache) {
grpc_cached_wakeup_fd* next = pollset->local_wakeup_cache->next;
+ fork_fd_list_remove_node(pollset->local_wakeup_cache->fork_fd_list);
grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
gpr_free(pollset->local_wakeup_cache);
pollset->local_wakeup_cache = next;
@@ -895,6 +976,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
worker.wakeup_fd = static_cast<grpc_cached_wakeup_fd*>(
gpr_malloc(sizeof(*worker.wakeup_fd)));
error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
+ fork_fd_list_add_wakeup_fd(worker.wakeup_fd);
if (error != GRPC_ERROR_NONE) {
GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
return error;
@@ -1705,6 +1787,10 @@ static void shutdown_engine(void) {
if (grpc_cv_wakeup_fds_enabled()) {
global_cv_fd_table_shutdown();
}
+ if (track_fds_for_fork) {
+ gpr_mu_destroy(&fork_fd_list_mu);
+ grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
+ }
}
static const grpc_event_engine_vtable vtable = {
@@ -1742,6 +1828,26 @@ static const grpc_event_engine_vtable vtable = {
shutdown_engine,
};
+/* Called by the child process's post-fork handler to close open fds, including
+ * worker wakeup fds. This allows gRPC to shutdown in the child process without
+ * interfering with connections or RPCs ongoing in the parent. */
+static void reset_event_manager_on_fork() {
+ gpr_mu_lock(&fork_fd_list_mu);
+ while (fork_fd_list_head != nullptr) {
+ if (fork_fd_list_head->fd != nullptr) {
+ close(fork_fd_list_head->fd->fd);
+ fork_fd_list_head->fd->fd = -1;
+ } else {
+ close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
+ fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
+ close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
+ fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
+ }
+ fork_fd_list_head = fork_fd_list_head->next;
+ }
+ gpr_mu_unlock(&fork_fd_list_mu);
+}
+
const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request) {
if (!grpc_has_wakeup_fd()) {
gpr_log(GPR_ERROR, "Skipping poll because of no wakeup fd.");
@@ -1750,6 +1856,12 @@ const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request) {
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
return nullptr;
}
+ if (grpc_core::Fork::Enabled()) {
+ track_fds_for_fork = true;
+ gpr_mu_init(&fork_fd_list_mu);
+ grpc_core::Fork::SetResetChildPollingEngineFunc(
+ reset_event_manager_on_fork);
+ }
return &vtable;
}
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 0e45fc42ca..d4377e2d50 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -59,7 +59,14 @@ grpc_core::DebugOnlyTraceFlag grpc_polling_api_trace(false, "polling_api");
/** Default poll() function - a pointer so that it can be overridden by some
* tests */
+#ifndef GPR_AIX
grpc_poll_function_type grpc_poll_function = poll;
+#else
+int aix_poll(struct pollfd fds[], nfds_t nfds, int timeout) {
+ return poll(fds, nfds, timeout);
+}
+grpc_poll_function_type grpc_poll_function = aix_poll;
+#endif
grpc_wakeup_fd grpc_global_wakeup_fd;
@@ -101,10 +108,28 @@ const grpc_event_engine_vtable* init_non_polling(bool explicit_request) {
}
} // namespace
-static const event_engine_factory g_factories[] = {
+#define ENGINE_HEAD_CUSTOM "head_custom"
+#define ENGINE_TAIL_CUSTOM "tail_custom"
+
+// The global array of event-engine factories. Each entry is a pair with a name
+// and an event-engine generator function (nullptr if there is no generator
+// registered for this name). The middle entries are the engines predefined by
+// open-source gRPC. The head entries represent an opportunity for specific
+// high-priority custom pollers to be added by the initializer plugins of
+// custom-built gRPC libraries. The tail entries represent the same, but for
+// low-priority custom pollers. The actual poller selected is either the first
+// available one in the list if no specific poller is requested, or the first
+// specific poller that is requested by name in the GRPC_POLL_STRATEGY
+// environment variable if that variable is set (which should be a
+// comma-separated list of one or more event engine names)
+static event_engine_factory g_factories[] = {
+ {ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr},
+ {ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr},
{"epollex", grpc_init_epollex_linux}, {"epoll1", grpc_init_epoll1_linux},
{"epollsig", grpc_init_epollsig_linux}, {"poll", grpc_init_poll_posix},
{"poll-cv", grpc_init_poll_cv_posix}, {"none", init_non_polling},
+ {ENGINE_TAIL_CUSTOM, nullptr}, {ENGINE_TAIL_CUSTOM, nullptr},
+ {ENGINE_TAIL_CUSTOM, nullptr}, {ENGINE_TAIL_CUSTOM, nullptr},
};
static void add(const char* beg, const char* end, char*** ss, size_t* ns) {
@@ -138,7 +163,7 @@ static bool is(const char* want, const char* have) {
static void try_engine(const char* engine) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
- if (is(engine, g_factories[i].name)) {
+ if (g_factories[i].factory != nullptr && is(engine, g_factories[i].name)) {
if ((g_event_engine = g_factories[i].factory(
0 == strcmp(engine, g_factories[i].name)))) {
g_poll_strategy_name = g_factories[i].name;
@@ -149,14 +174,32 @@ static void try_engine(const char* engine) {
}
}
-/* This should be used for testing purposes ONLY */
-void grpc_set_event_engine_test_only(
- const grpc_event_engine_vtable* ev_engine) {
- g_event_engine = ev_engine;
-}
+/* Call this before calling grpc_event_engine_init() */
+void grpc_register_event_engine_factory(const char* name,
+ event_engine_factory_fn factory,
+ bool add_at_head) {
+ const char* custom_match =
+ add_at_head ? ENGINE_HEAD_CUSTOM : ENGINE_TAIL_CUSTOM;
-const grpc_event_engine_vtable* grpc_get_event_engine_test_only() {
- return g_event_engine;
+ // Overwrite an existing registration if already registered
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
+ if (0 == strcmp(name, g_factories[i].name)) {
+ g_factories[i].factory = factory;
+ return;
+ }
+ }
+
+ // Otherwise fill in an available custom slot
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
+ if (0 == strcmp(g_factories[i].name, custom_match)) {
+ g_factories[i].name = name;
+ g_factories[i].factory = factory;
+ return;
+ }
+ }
+
+ // Otherwise fail
+ GPR_ASSERT(false);
}
/* Call this only after calling grpc_event_engine_init() */
@@ -194,14 +237,19 @@ void grpc_event_engine_shutdown(void) {
}
bool grpc_event_engine_can_track_errors(void) {
+/* Only track errors if platform supports errqueue. */
+#ifdef GRPC_LINUX_ERRQUEUE
return g_event_engine->can_track_err;
+#else
+ return false;
+#endif /* GRPC_LINUX_ERRQUEUE */
}
grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) {
GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
- GPR_DEBUG_ASSERT(!track_err || g_event_engine->can_track_err);
- return g_event_engine->fd_create(fd, name, track_err);
+ return g_event_engine->fd_create(fd, name,
+ track_err && g_event_engine->can_track_err);
}
int grpc_fd_wrapped_fd(grpc_fd* fd) {
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 8d0bcc0710..b8fb8f534b 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -82,6 +82,11 @@ typedef struct grpc_event_engine_vtable {
void (*shutdown_engine)(void);
} grpc_event_engine_vtable;
+/* register a new event engine factory */
+void grpc_register_event_engine_factory(
+ const char* name, const grpc_event_engine_vtable* (*factory)(bool),
+ bool add_at_head);
+
void grpc_event_engine_init(void);
void grpc_event_engine_shutdown(void);
@@ -173,9 +178,4 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
typedef int (*grpc_poll_function_type)(struct pollfd*, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
-/* WARNING: The following two functions should be used for testing purposes
- * ONLY */
-void grpc_set_event_engine_test_only(const grpc_event_engine_vtable*);
-const grpc_event_engine_vtable* grpc_get_event_engine_test_only();
-
#endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */
diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc
index 5d5c355ff9..d68fa0714b 100644
--- a/src/core/lib/iomgr/exec_ctx.cc
+++ b/src/core/lib/iomgr/exec_ctx.cc
@@ -109,6 +109,12 @@ grpc_closure_scheduler* grpc_schedule_on_exec_ctx = &exec_ctx_scheduler;
namespace grpc_core {
GPR_TLS_CLASS_DEF(ExecCtx::exec_ctx_);
+// WARNING: for testing purposes only!
+void ExecCtx::TestOnlyGlobalInit(gpr_timespec new_val) {
+ g_start_time = new_val;
+ gpr_tls_init(&exec_ctx_);
+}
+
void ExecCtx::GlobalInit(void) {
g_start_time = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_tls_init(&exec_ctx_);
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 8ddab0d381..f3528d527a 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -192,6 +192,8 @@ class ExecCtx {
now_is_valid_ = true;
}
+ static void TestOnlyGlobalInit(gpr_timespec new_val);
+
/** Global initialization for ExecCtx. Called by iomgr. */
static void GlobalInit(void);
diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc
index b37384b8db..e957bad73d 100644
--- a/src/core/lib/iomgr/fork_posix.cc
+++ b/src/core/lib/iomgr/fork_posix.cc
@@ -25,6 +25,7 @@
#include <string.h>
#include <grpc/fork.h>
+#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include "src/core/lib/gpr/env.h"
@@ -34,7 +35,6 @@
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/surface/init.h"
/*
* NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
@@ -58,6 +58,12 @@ void grpc_prefork() {
"environment variable GRPC_ENABLE_FORK_SUPPORT=1");
return;
}
+ if (strcmp(grpc_get_poll_strategy_name(), "epoll1") != 0 &&
+ strcmp(grpc_get_poll_strategy_name(), "poll") != 0) {
+ gpr_log(GPR_ERROR,
+ "Fork support is only compatible with the epoll1 and poll polling "
+ "strategies");
+ }
if (!grpc_core::Fork::BlockExecCtx()) {
gpr_log(GPR_INFO,
"Other threads are currently calling into gRPC, skipping fork() "
@@ -84,6 +90,11 @@ void grpc_postfork_child() {
if (!skipped_handler) {
grpc_core::Fork::AllowExecCtx();
grpc_core::ExecCtx exec_ctx;
+ grpc_core::Fork::child_postfork_func reset_polling_engine =
+ grpc_core::Fork::GetResetChildPollingEngineFunc();
+ if (reset_polling_engine != nullptr) {
+ reset_polling_engine();
+ }
grpc_timer_manager_set_threading(true);
grpc_executor_set_threading(true);
}
diff --git a/src/core/lib/iomgr/internal_errqueue.cc b/src/core/lib/iomgr/internal_errqueue.cc
new file mode 100644
index 0000000000..99c22e9055
--- /dev/null
+++ b/src/core/lib/iomgr/internal_errqueue.cc
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#include "src/core/lib/iomgr/internal_errqueue.h"
+
+#ifdef GRPC_POSIX_SOCKET_TCP
+
+bool kernel_supports_errqueue() {
+#ifdef LINUX_VERSION_CODE
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0)
+ return true;
+#endif /* LINUX_VERSION_CODE <= KERNEL_VERSION(4, 0, 0) */
+#endif /* LINUX_VERSION_CODE */
+ return false;
+}
+
+#endif /* GRPC_POSIX_SOCKET_TCP */
diff --git a/src/core/lib/iomgr/internal_errqueue.h b/src/core/lib/iomgr/internal_errqueue.h
new file mode 100644
index 0000000000..9d122808f9
--- /dev/null
+++ b/src/core/lib/iomgr/internal_errqueue.h
@@ -0,0 +1,83 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/* This file contains constants defined in <linux/errqueue.h> and
+ * <linux/net_tstamp.h> so as to allow collecting network timestamps in the
+ * kernel. This file allows tcp_posix.cc to compile on platforms that do not
+ * have <linux/errqueue.h> and <linux/net_tstamp.h>.
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H
+#define GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_POSIX_SOCKET_TCP
+
+#include <sys/types.h>
+#include <time.h>
+
+#ifdef GRPC_LINUX_ERRQUEUE
+#include <linux/errqueue.h>
+#include <linux/net_tstamp.h>
+#include <sys/socket.h>
+#endif /* GRPC_LINUX_ERRQUEUE */
+
+namespace grpc_core {
+
+#ifdef GRPC_LINUX_ERRQUEUE
+
+/* Redefining scm_timestamping in the same way that <linux/errqueue.h> defines
+ * it, so that code compiles on systems that don't have it. */
+struct scm_timestamping {
+ struct timespec ts[3];
+};
+/* Also redefine timestamp types */
+/* The timestamp type for when the driver passed skb to NIC, or HW. */
+constexpr int SCM_TSTAMP_SND = 0;
+/* The timestamp type for when data entered the packet scheduler. */
+constexpr int SCM_TSTAMP_SCHED = 1;
+/* The timestamp type for when data acknowledged by peer. */
+constexpr int SCM_TSTAMP_ACK = 2;
+/* Redefine required constants from <linux/net_tstamp.h> */
+constexpr uint32_t SOF_TIMESTAMPING_TX_SOFTWARE = 1u << 1;
+constexpr uint32_t SOF_TIMESTAMPING_SOFTWARE = 1u << 4;
+constexpr uint32_t SOF_TIMESTAMPING_OPT_ID = 1u << 7;
+constexpr uint32_t SOF_TIMESTAMPING_TX_SCHED = 1u << 8;
+constexpr uint32_t SOF_TIMESTAMPING_TX_ACK = 1u << 9;
+constexpr uint32_t SOF_TIMESTAMPING_OPT_TSONLY = 1u << 11;
+
+constexpr uint32_t kTimestampingSocketOptions = SOF_TIMESTAMPING_SOFTWARE |
+ SOF_TIMESTAMPING_OPT_ID |
+ SOF_TIMESTAMPING_OPT_TSONLY;
+constexpr uint32_t kTimestampingRecordingOptions =
+ SOF_TIMESTAMPING_TX_SCHED | SOF_TIMESTAMPING_TX_SOFTWARE |
+ SOF_TIMESTAMPING_TX_ACK;
+#endif /* GRPC_LINUX_ERRQUEUE */
+
+/* Returns true if kernel is capable of supporting errqueue and timestamping.
+ * Currently allowing only linux kernels above 4.0.0
+ */
+bool kernel_supports_errqueue();
+} // namespace grpc_core
+
+#endif /* GRPC_POSIX_SOCKET_TCP */
+
+#endif /* GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H */
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index 1d0ecff802..abf96662f5 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -60,6 +60,11 @@
#define GRPC_HAVE_IP_PKTINFO 1
#define GRPC_HAVE_MSG_NOSIGNAL 1
#define GRPC_HAVE_UNIX_SOCKET 1
+#ifdef LINUX_VERSION_CODE
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0)
+#define GRPC_LINUX_ERRQUEUE 1
+#endif /* LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) */
+#endif /* LINUX_VERSION_CODE */
#define GRPC_LINUX_MULTIPOLL_WITH_EPOLL 1
#define GRPC_POSIX_FORK 1
#define GRPC_POSIX_HOST_NAME_MAX 1
@@ -140,6 +145,18 @@
#define GRPC_POSIX_SOCKET 1
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
+#elif defined(GPR_SOLARIS)
+#define GRPC_HAVE_UNIX_SOCKET 1
+#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
+#define GRPC_POSIX_SOCKET 1
+#define GRPC_POSIX_SOCKETUTILS 1
+#define GRPC_POSIX_WAKEUP_FD 1
+#elif defined(GPR_AIX)
+#define GRPC_HAVE_UNIX_SOCKET 1
+#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
+#define GRPC_POSIX_SOCKET 1
+#define GRPC_POSIX_SOCKETUTILS 1
+#define GRPC_POSIX_WAKEUP_FD 1
#elif defined(GPR_NACL)
#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc
index 539bc120ce..b6fc7579f7 100644
--- a/src/core/lib/iomgr/resource_quota.cc
+++ b/src/core/lib/iomgr/resource_quota.cc
@@ -96,6 +96,9 @@ struct grpc_resource_user {
list, false otherwise */
bool added_to_free_pool;
+ /* The number of threads currently allocated to this resource user */
+ gpr_atm num_threads_allocated;
+
/* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
*/
grpc_closure* reclaimers[2];
@@ -135,12 +138,33 @@ struct grpc_resource_quota {
gpr_atm last_size;
+ /* Mutex to protect max_threads and num_threads_allocated */
+ /* Note: We could have used gpr_atm for max_threads and num_threads_allocated
+ * and avoid having this mutex; but in that case, each invocation of the
+ * function grpc_resource_user_allocate_threads() would have had to do at
+ * least two atomic loads (for max_threads and num_threads_allocated) followed
+ * by a CAS (on num_threads_allocated).
+ * Moreover, we expect grpc_resource_user_allocate_threads() to be often
+ * called concurrently thereby increasing the chances of failing the CAS
+ * operation. This additional complexity is not worth the tiny perf gain we
+ * may (or may not) have by using atomics */
+ gpr_mu thread_count_mu;
+
+ /* Max number of threads allowed */
+ int max_threads;
+
+ /* Number of threads currently allocated via this resource_quota object */
+ int num_threads_allocated;
+
/* Has rq_step been scheduled to occur? */
bool step_scheduled;
+
/* Are we currently reclaiming memory */
bool reclaiming;
+
/* Closure around rq_step */
grpc_closure rq_step_closure;
+
/* Closure around rq_reclamation_done */
grpc_closure rq_reclamation_done_closure;
@@ -524,6 +548,11 @@ static void ru_shutdown(void* ru, grpc_error* error) {
static void ru_destroy(void* ru, grpc_error* error) {
grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0);
+ // Free all the remaining thread quota
+ grpc_resource_user_free_threads(resource_user,
+ static_cast<int>(gpr_atm_no_barrier_load(
+ &resource_user->num_threads_allocated)));
+
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
rulist_remove(resource_user, static_cast<grpc_rulist>(i));
}
@@ -594,6 +623,9 @@ grpc_resource_quota* grpc_resource_quota_create(const char* name) {
resource_quota->free_pool = INT64_MAX;
resource_quota->size = INT64_MAX;
gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
+ gpr_mu_init(&resource_quota->thread_count_mu);
+ resource_quota->max_threads = INT_MAX;
+ resource_quota->num_threads_allocated = 0;
resource_quota->step_scheduled = false;
resource_quota->reclaiming = false;
gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
@@ -616,6 +648,8 @@ grpc_resource_quota* grpc_resource_quota_create(const char* name) {
void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) {
if (gpr_unref(&resource_quota->refs)) {
+ // No outstanding thread quota
+ GPR_ASSERT(resource_quota->num_threads_allocated == 0);
GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota");
gpr_free(resource_quota->name);
gpr_free(resource_quota);
@@ -647,6 +681,15 @@ double grpc_resource_quota_get_memory_pressure(
}
/* Public API */
+void grpc_resource_quota_set_max_threads(grpc_resource_quota* resource_quota,
+ int new_max_threads) {
+ GPR_ASSERT(new_max_threads >= 0);
+ gpr_mu_lock(&resource_quota->thread_count_mu);
+ resource_quota->max_threads = new_max_threads;
+ gpr_mu_unlock(&resource_quota->thread_count_mu);
+}
+
+/* Public API */
void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
size_t size) {
grpc_core::ExecCtx exec_ctx;
@@ -731,6 +774,7 @@ grpc_resource_user* grpc_resource_user_create(
grpc_closure_list_init(&resource_user->on_allocated);
resource_user->allocating = false;
resource_user->added_to_free_pool = false;
+ gpr_atm_no_barrier_store(&resource_user->num_threads_allocated, 0);
resource_user->reclaimers[0] = nullptr;
resource_user->reclaimers[1] = nullptr;
resource_user->new_reclaimers[0] = nullptr;
@@ -785,6 +829,40 @@ void grpc_resource_user_shutdown(grpc_resource_user* resource_user) {
}
}
+bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
+ int thread_count) {
+ GPR_ASSERT(thread_count >= 0);
+ bool is_success = false;
+ gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
+ grpc_resource_quota* rq = resource_user->resource_quota;
+ if (rq->num_threads_allocated + thread_count <= rq->max_threads) {
+ rq->num_threads_allocated += thread_count;
+ gpr_atm_no_barrier_fetch_add(&resource_user->num_threads_allocated,
+ thread_count);
+ is_success = true;
+ }
+ gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
+ return is_success;
+}
+
+void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
+ int thread_count) {
+ GPR_ASSERT(thread_count >= 0);
+ gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
+ grpc_resource_quota* rq = resource_user->resource_quota;
+ rq->num_threads_allocated -= thread_count;
+ int old_count = static_cast<int>(gpr_atm_no_barrier_fetch_add(
+ &resource_user->num_threads_allocated, -thread_count));
+ if (old_count < thread_count || rq->num_threads_allocated < 0) {
+ gpr_log(GPR_ERROR,
+ "Releasing more threads (%d) than currently allocated (rq threads: "
+ "%d, ru threads: %d)",
+ thread_count, rq->num_threads_allocated + thread_count, old_count);
+ abort();
+ }
+ gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
+}
+
void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
grpc_closure* optional_on_done) {
gpr_mu_lock(&resource_user->mu);
diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h
index 937daf8728..7b0ed7417a 100644
--- a/src/core/lib/iomgr/resource_quota.h
+++ b/src/core/lib/iomgr/resource_quota.h
@@ -93,6 +93,22 @@ void grpc_resource_user_ref(grpc_resource_user* resource_user);
void grpc_resource_user_unref(grpc_resource_user* resource_user);
void grpc_resource_user_shutdown(grpc_resource_user* resource_user);
+/* Attempts to get quota from the resource_user to create 'thread_count' number
+ * of threads. Returns true if successful (i.e the caller is now free to create
+ * 'thread_count' number of threads) or false if quota is not available */
+bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
+ int thread_count);
+/* Releases 'thread_count' worth of quota back to the resource user. The quota
+ * should have been previously obtained successfully by calling
+ * grpc_resource_user_allocate_threads().
+ *
+ * Note: There need not be an exact one-to-one correspondence between
+ * grpc_resource_user_allocate_threads() and grpc_resource_user_free_threads()
+ * calls. The only requirement is that the number of threads allocated should
+ * all be eventually released */
+void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
+ int thread_count);
+
/* Allocate from the resource user (and its quota).
If optional_on_done is NULL, then allocate immediately. This may push the
quota over-limit, at which point reclamation will kick in.
diff --git a/src/core/lib/iomgr/socket_mutator.cc b/src/core/lib/iomgr/socket_mutator.cc
index b9b8eaf4ad..a448c9f61c 100644
--- a/src/core/lib/iomgr/socket_mutator.cc
+++ b/src/core/lib/iomgr/socket_mutator.cc
@@ -57,7 +57,7 @@ int grpc_socket_mutator_compare(grpc_socket_mutator* a,
void grpc_socket_mutator_unref(grpc_socket_mutator* mutator) {
if (gpr_unref(&mutator->refcount)) {
- mutator->vtable->destory(mutator);
+ mutator->vtable->destroy(mutator);
}
}
diff --git a/src/core/lib/iomgr/socket_mutator.h b/src/core/lib/iomgr/socket_mutator.h
index 6c7781c51d..8742a3ba61 100644
--- a/src/core/lib/iomgr/socket_mutator.h
+++ b/src/core/lib/iomgr/socket_mutator.h
@@ -33,7 +33,7 @@ typedef struct {
/** Compare socket mutator \a a and \a b */
int (*compare)(grpc_socket_mutator* a, grpc_socket_mutator* b);
/** Destroys the socket mutator instance */
- void (*destory)(grpc_socket_mutator* mutator);
+ void (*destroy)(grpc_socket_mutator* mutator);
} grpc_socket_mutator_vtable;
/** The Socket Mutator interface allows changes on socket options */
diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc
index 296ee74311..9c989b7dfe 100644
--- a/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/src/core/lib/iomgr/tcp_client_posix.cc
@@ -279,7 +279,7 @@ grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
}
addr_str = grpc_sockaddr_to_uri(mapped_addr);
gpr_asprintf(&name, "tcp-client:%s", addr_str);
- *fdobj = grpc_fd_create(fd, name, false);
+ *fdobj = grpc_fd_create(fd, name, true);
gpr_free(name);
gpr_free(addr_str);
return GRPC_ERROR_NONE;
diff --git a/src/core/lib/iomgr/tcp_custom.cc b/src/core/lib/iomgr/tcp_custom.cc
index 990e8d632b..e02a1898f2 100644
--- a/src/core/lib/iomgr/tcp_custom.cc
+++ b/src/core/lib/iomgr/tcp_custom.cc
@@ -221,7 +221,7 @@ static void custom_write_callback(grpc_custom_socket* socket,
}
static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)ep;
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index b53ffbf01c..ac1e919acb 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -27,7 +27,9 @@
#include <errno.h>
#include <limits.h>
+#include <netinet/in.h>
#include <stdbool.h>
+#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
@@ -46,6 +48,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/profiling/timers.h"
@@ -97,17 +100,42 @@ struct grpc_tcp {
grpc_closure read_done_closure;
grpc_closure write_done_closure;
+ grpc_closure error_closure;
char* peer_string;
grpc_resource_user* resource_user;
grpc_resource_user_slice_allocator slice_allocator;
+
+ grpc_core::TracedBuffer* tb_head; /* List of traced buffers */
+ gpr_mu tb_mu; /* Lock for access to list of traced buffers */
+
+ /* grpc_endpoint_write takes an argument which if non-null means that the
+ * transport layer wants the TCP layer to collect timestamps for this write.
+ * This arg is forwarded to the timestamps callback function when the ACK
+ * timestamp is received from the kernel. This arg is a (void *) which allows
+ * users of this API to pass in a pointer to any kind of structure. This
+ * structure could actually be a tag or any book-keeping object that the user
+ * can use to distinguish between different traced writes. The only
+ * requirement from the TCP endpoint layer is that this arg should be non-null
+ * if the user wants timestamps for the write. */
+ void* outgoing_buffer_arg;
+ /* A counter which starts at 0. It is initialized the first time the socket
+ * options for collecting timestamps are set, and is incremented with each
+ * byte sent. */
+ int bytes_counter;
+ bool socket_ts_enabled; /* True if timestamping options are set on the socket
+ */
+ gpr_atm
+ stop_error_notification; /* Set to 1 if we do not want to be notified on
+ errors anymore */
};
struct backup_poller {
gpr_mu* pollset_mu;
grpc_closure run_poller;
};
+
} // namespace
#define BACKUP_POLLER_POLLSET(b) ((grpc_pollset*)((b) + 1))
@@ -302,6 +330,7 @@ static void tcp_free(grpc_tcp* tcp) {
grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
grpc_resource_user_unref(tcp->resource_user);
gpr_free(tcp->peer_string);
+ gpr_mu_destroy(&tcp->tb_mu);
gpr_free(tcp);
}
@@ -347,6 +376,10 @@ static void tcp_destroy(grpc_endpoint* ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
+ if (grpc_event_engine_can_track_errors()) {
+ gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
+ grpc_fd_set_error(tcp->em_fd);
+ }
TCP_UNREF(tcp, "destroy");
}
@@ -513,6 +546,235 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
}
}
+/* A wrapper around sendmsg. It sends \a msg over \a fd and returns the number
+ * of bytes sent. */
+ssize_t tcp_send(int fd, const struct msghdr* msg) {
+ GPR_TIMER_SCOPE("sendmsg", 1);
+ ssize_t sent_length;
+ do {
+ /* TODO(klempner): Cork if this is a partial write */
+ GRPC_STATS_INC_SYSCALL_WRITE();
+ sent_length = sendmsg(fd, msg, SENDMSG_FLAGS);
+ } while (sent_length < 0 && errno == EINTR);
+ return sent_length;
+}
+
+/** This is to be called if outgoing_buffer_arg is not null. On linux platforms,
+ * this will call sendmsg with socket options set to collect timestamps inside
+ * the kernel. On return, sent_length is set to the return value of the sendmsg
+ * call. Returns false if setting the socket options failed. This is not
+ * implemented for non-linux platforms currently, and crashes out.
+ */
+static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
+ size_t sending_length,
+ ssize_t* sent_length, grpc_error** error);
+
+/** The callback function to be invoked when we get an error on the socket. */
+static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error);
+
+#ifdef GRPC_LINUX_ERRQUEUE
+static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
+ size_t sending_length,
+ ssize_t* sent_length,
+ grpc_error** error) {
+ if (!tcp->socket_ts_enabled) {
+ uint32_t opt = grpc_core::kTimestampingSocketOptions;
+ if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING,
+ static_cast<void*>(&opt), sizeof(opt)) != 0) {
+ *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "setsockopt"), tcp);
+ grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Failed to set timestamping options on the socket.");
+ }
+ return false;
+ }
+ tcp->bytes_counter = -1;
+ tcp->socket_ts_enabled = true;
+ }
+ /* Set control message to indicate that you want timestamps. */
+ union {
+ char cmsg_buf[CMSG_SPACE(sizeof(uint32_t))];
+ struct cmsghdr align;
+ } u;
+ cmsghdr* cmsg = reinterpret_cast<cmsghdr*>(u.cmsg_buf);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SO_TIMESTAMPING;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(uint32_t));
+ *reinterpret_cast<int*>(CMSG_DATA(cmsg)) =
+ grpc_core::kTimestampingRecordingOptions;
+ msg->msg_control = u.cmsg_buf;
+ msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t));
+
+ /* If there was an error on sendmsg the logic in tcp_flush will handle it. */
+ ssize_t length = tcp_send(tcp->fd, msg);
+ *sent_length = length;
+ /* Only save timestamps if all the bytes were taken by sendmsg. */
+ if (sending_length == static_cast<size_t>(length)) {
+ gpr_mu_lock(&tcp->tb_mu);
+ grpc_core::TracedBuffer::AddNewEntry(
+ &tcp->tb_head, static_cast<int>(tcp->bytes_counter + length),
+ tcp->outgoing_buffer_arg);
+ gpr_mu_unlock(&tcp->tb_mu);
+ tcp->outgoing_buffer_arg = nullptr;
+ }
+ return true;
+}
+
+/** Reads \a cmsg to derive timestamps from the control messages. If a valid
+ * timestamp is found, the traced buffer list is updated with this timestamp.
+ * The caller of this function should be looping on the control messages found
+ * in \a msg. \a cmsg should point to the control message that the caller wants
+ * processed.
+ * On return, a pointer to a control message is returned. On the next iteration,
+ * CMSG_NXTHDR(msg, ret_val) should be passed as \a cmsg. */
+struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
+ struct cmsghdr* cmsg) {
+ auto next_cmsg = CMSG_NXTHDR(msg, cmsg);
+ if (next_cmsg == nullptr) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Received timestamp without extended error");
+ }
+ return cmsg;
+ }
+
+ if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) ||
+ !(next_cmsg->cmsg_type == IP_RECVERR ||
+ next_cmsg->cmsg_type == IPV6_RECVERR)) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Unexpected control message");
+ }
+ return cmsg;
+ }
+
+ auto tss =
+ reinterpret_cast<struct grpc_core::scm_timestamping*>(CMSG_DATA(cmsg));
+ auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(next_cmsg));
+ if (serr->ee_errno != ENOMSG ||
+ serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) {
+ gpr_log(GPR_ERROR, "Unexpected control message");
+ return cmsg;
+ }
+ /* The error handling can potentially be done on another thread so we need
+ * to protect the traced buffer list. A lock free list might be better. Using
+ * a simple mutex for now. */
+ gpr_mu_lock(&tcp->tb_mu);
+ grpc_core::TracedBuffer::ProcessTimestamp(&tcp->tb_head, serr, tss);
+ gpr_mu_unlock(&tcp->tb_mu);
+ return next_cmsg;
+}
+
+/** For linux platforms, reads the socket's error queue and processes error
+ * messages from the queue. Returns true if all the errors processed were
+ * timestamps. Returns false if any of the errors were not timestamps. For
+ * non-linux platforms, error processing is not used/enabled currently.
+ */
+static bool process_errors(grpc_tcp* tcp) {
+ while (true) {
+ struct iovec iov;
+ iov.iov_base = nullptr;
+ iov.iov_len = 0;
+ struct msghdr msg;
+ msg.msg_name = nullptr;
+ msg.msg_namelen = 0;
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 0;
+ msg.msg_flags = 0;
+
+ union {
+ char rbuf[1024 /*CMSG_SPACE(sizeof(scm_timestamping)) +
+ CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in))*/];
+ struct cmsghdr align;
+ } aligned_buf;
+ memset(&aligned_buf, 0, sizeof(aligned_buf));
+
+ msg.msg_control = aligned_buf.rbuf;
+ msg.msg_controllen = sizeof(aligned_buf.rbuf);
+
+ int r, saved_errno;
+ do {
+ r = recvmsg(tcp->fd, &msg, MSG_ERRQUEUE);
+ saved_errno = errno;
+ } while (r < 0 && saved_errno == EINTR);
+
+ if (r == -1 && saved_errno == EAGAIN) {
+ return true; /* No more errors to process */
+ }
+ if (r == -1) {
+ return false;
+ }
+ if (grpc_tcp_trace.enabled()) {
+ if ((msg.msg_flags & MSG_CTRUNC) == 1) {
+ gpr_log(GPR_INFO, "Error message was truncated.");
+ }
+ }
+
+ if (msg.msg_controllen == 0) {
+ /* There was no control message found. It was probably spurious. */
+ return true;
+ }
+ for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len;
+ cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+ if (cmsg->cmsg_level != SOL_SOCKET ||
+ cmsg->cmsg_type != SCM_TIMESTAMPING) {
+ /* Got a control message that is not a timestamp. Don't know how to
+ * handle this. */
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "unknown control message cmsg_level:%d cmsg_type:%d",
+ cmsg->cmsg_level, cmsg->cmsg_type);
+ }
+ return false;
+ }
+ process_timestamp(tcp, &msg, cmsg);
+ }
+ }
+}
+
+static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
+ grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, grpc_error_string(error));
+ }
+
+ if (error != GRPC_ERROR_NONE ||
+ static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))) {
+ /* We aren't going to register to hear on error anymore, so it is safe to
+ * unref. */
+ grpc_core::TracedBuffer::Shutdown(&tcp->tb_head, GRPC_ERROR_REF(error));
+ TCP_UNREF(tcp, "error-tracking");
+ return;
+ }
+
+ /* We are still interested in collecting timestamps, so let's try reading
+ * them. */
+ if (!process_errors(tcp)) {
+ /* This was not a timestamps error. This was an actual error. Set the
+ * read and write closures to be ready.
+ */
+ grpc_fd_set_readable(tcp->em_fd);
+ grpc_fd_set_writable(tcp->em_fd);
+ }
+ GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp,
+ grpc_schedule_on_exec_ctx);
+ grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure);
+}
+
+#else /* GRPC_LINUX_ERRQUEUE */
+static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
+ size_t sending_length,
+ ssize_t* sent_length,
+ grpc_error** error) {
+ gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform");
+ GPR_ASSERT(0);
+ return false;
+}
+
+static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
+ gpr_log(GPR_ERROR, "Error handling is not supported for this platform");
+ GPR_ASSERT(0);
+}
+#endif /* GRPC_LINUX_ERRQUEUE */
+
/* returns true if done, false if pending; if returning true, *error is set */
#if defined(IOV_MAX) && IOV_MAX < 1000
#define MAX_WRITE_IOVEC IOV_MAX
@@ -557,19 +819,20 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = iov_size;
- msg.msg_control = nullptr;
- msg.msg_controllen = 0;
msg.msg_flags = 0;
+ if (tcp->outgoing_buffer_arg != nullptr) {
+ if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length,
+ error))
+ return true; /* something went wrong with timestamps */
+ } else {
+ msg.msg_control = nullptr;
+ msg.msg_controllen = 0;
- GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length);
- GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size);
+ GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length);
+ GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size);
- GPR_TIMER_SCOPE("sendmsg", 1);
- do {
- /* TODO(klempner): Cork if this is a partial write */
- GRPC_STATS_INC_SYSCALL_WRITE();
- sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
- } while (sent_length < 0 && errno == EINTR);
+ sent_length = tcp_send(tcp->fd, &msg);
+ }
if (sent_length < 0) {
if (errno == EAGAIN) {
@@ -593,6 +856,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
}
GPR_ASSERT(tcp->outgoing_byte_idx == 0);
+ tcp->bytes_counter += sent_length;
trailing = sending_length - static_cast<size_t>(sent_length);
while (trailing > 0) {
size_t slice_length;
@@ -607,7 +871,6 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
trailing -= slice_length;
}
}
-
if (outgoing_slice_idx == tcp->outgoing_buffer->count) {
*error = GRPC_ERROR_NONE;
grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
@@ -640,14 +903,13 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
const char* str = grpc_error_string(error);
gpr_log(GPR_INFO, "write: %s", str);
}
-
GRPC_CLOSURE_SCHED(cb, error);
TCP_UNREF(tcp, "write");
}
}
static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
GPR_TIMER_SCOPE("tcp_write", 0);
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
grpc_error* error = GRPC_ERROR_NONE;
@@ -675,6 +937,10 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
}
tcp->outgoing_buffer = buf;
tcp->outgoing_byte_idx = 0;
+ tcp->outgoing_buffer_arg = arg;
+ if (arg) {
+ GPR_ASSERT(grpc_event_engine_can_track_errors());
+ }
if (!tcp_flush(tcp, &error)) {
TCP_REF(tcp, "write");
@@ -792,6 +1058,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp->bytes_read_this_round = 0;
/* Will be set to false by the very first endpoint read function */
tcp->is_first_read = true;
+ tcp->bytes_counter = -1;
+ tcp->socket_ts_enabled = false;
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
@@ -803,6 +1071,19 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
/* Tell network status tracker about new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
grpc_resource_quota_unref_internal(resource_quota);
+ gpr_mu_init(&tcp->tb_mu);
+ tcp->tb_head = nullptr;
+ /* Start being notified on errors if event engine can track errors. */
+ if (grpc_event_engine_can_track_errors()) {
+ /* Grab a ref to tcp so that we can safely access the tcp struct when
+ * processing errors. We unref when we no longer want to track errors
+ * separately. */
+ TCP_REF(tcp, "error-tracking");
+ gpr_atm_rel_store(&tcp->stop_error_notification, 0);
+ GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp,
+ grpc_schedule_on_exec_ctx);
+ grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure);
+ }
return &tcp->base;
}
@@ -821,6 +1102,11 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
tcp->release_fd = fd;
tcp->release_fd_cb = done;
grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
+ if (grpc_event_engine_can_track_errors()) {
+ /* Stop errors notification. */
+ gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
+ grpc_fd_set_error(tcp->em_fd);
+ }
TCP_UNREF(tcp, "destroy");
}
diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h
index af89bd24db..eff825cb92 100644
--- a/src/core/lib/iomgr/tcp_posix.h
+++ b/src/core/lib/iomgr/tcp_posix.h
@@ -31,7 +31,10 @@
#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
+
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/ev_posix.h"
diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc
index 8ddf684fea..824db07fbf 100644
--- a/src/core/lib/iomgr/tcp_server_posix.cc
+++ b/src/core/lib/iomgr/tcp_server_posix.cc
@@ -226,7 +226,7 @@ static void on_read(void* arg, grpc_error* err) {
gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str);
}
- grpc_fd* fdobj = grpc_fd_create(fd, name, false);
+ grpc_fd* fdobj = grpc_fd_create(fd, name, true);
read_notifier_pollset =
sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
@@ -362,7 +362,7 @@ static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) {
listener->sibling = sp;
sp->server = listener->server;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name, false);
+ sp->emfd = grpc_fd_create(fd, name, true);
memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = listener->port_index;
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
index b9f8145572..9595c028ce 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
@@ -105,7 +105,7 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, int fd,
s->tail = sp;
sp->server = s;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name, false);
+ sp->emfd = grpc_fd_create(fd, name, true);
memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = port_index;
diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc
index b3cb442f18..64c4a56ae9 100644
--- a/src/core/lib/iomgr/tcp_windows.cc
+++ b/src/core/lib/iomgr/tcp_windows.cc
@@ -296,7 +296,7 @@ static void on_write(void* tcpp, grpc_error* error) {
/* Initiates a write. */
static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
grpc_tcp* tcp = (grpc_tcp*)ep;
grpc_winsocket* socket = tcp->socket;
grpc_winsocket_callback_info* info = &socket->write_info;
diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h
index 7f534476df..17e933b865 100644
--- a/src/core/lib/iomgr/timer.h
+++ b/src/core/lib/iomgr/timer.h
@@ -61,10 +61,11 @@ typedef struct grpc_timer_vtable {
/* Initialize *timer. When expired or canceled, closure will be called with
error set to indicate if it expired (GRPC_ERROR_NONE) or was canceled
- (GRPC_ERROR_CANCELLED). timer_cb is guaranteed to be called exactly once, and
+ (GRPC_ERROR_CANCELLED). *closure is guaranteed to be called exactly once, and
application code should check the error to determine how it was invoked. The
application callback is also responsible for maintaining information about
- when to free up any user-level state. */
+ when to free up any user-level state. Behavior is undefined for a deadline of
+ GRPC_MILLIS_INF_FUTURE. */
void grpc_timer_init(grpc_timer* timer, grpc_millis deadline,
grpc_closure* closure);
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index bdb2d0e764..3dd7cab855 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -152,7 +152,7 @@ GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd,
grpc_sockaddr_to_string(&addr_str, addr, 1);
gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
gpr_free(addr_str);
- emfd_ = grpc_fd_create(fd, name, false);
+ emfd_ = grpc_fd_create(fd, name, true);
memcpy(&addr_, addr, sizeof(grpc_resolved_address));
GPR_ASSERT(emfd_);
gpr_free(name);
diff --git a/src/core/lib/security/credentials/jwt/json_token.h b/src/core/lib/security/credentials/jwt/json_token.h
index d0fb4ebd0a..3ed990140d 100644
--- a/src/core/lib/security/credentials/jwt/json_token.h
+++ b/src/core/lib/security/credentials/jwt/json_token.h
@@ -21,6 +21,8 @@
#include <grpc/support/port_platform.h>
+#include "src/core/tsi/grpc_shadow_boringssl.h"
+
#include <grpc/slice.h>
#include <openssl/rsa.h>
diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.cc b/src/core/lib/security/credentials/jwt/jwt_verifier.cc
index 5c47276e32..c7d1b36ff0 100644
--- a/src/core/lib/security/credentials/jwt/jwt_verifier.cc
+++ b/src/core/lib/security/credentials/jwt/jwt_verifier.cc
@@ -18,6 +18,8 @@
#include <grpc/support/port_platform.h>
+#include "src/core/tsi/grpc_shadow_boringssl.h"
+
#include "src/core/lib/security/credentials/jwt/jwt_verifier.h"
#include <limits.h>
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
index 43dd68e874..44b093557f 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
@@ -235,7 +235,7 @@ static void on_oauth2_token_fetcher_http_response(void* user_data,
access_token_md);
} else {
error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Error occured when fetching oauth2 token.", &error, 1);
+ "Error occurred when fetching oauth2 token.", &error, 1);
}
GRPC_CLOSURE_SCHED(pending_request->on_request_metadata, error);
grpc_polling_entity_del_from_pollset_set(
diff --git a/src/core/lib/security/transport/secure_endpoint.cc b/src/core/lib/security/transport/secure_endpoint.cc
index 840b2e73bc..f40f969bb7 100644
--- a/src/core/lib/security/transport/secure_endpoint.cc
+++ b/src/core/lib/security/transport/secure_endpoint.cc
@@ -254,7 +254,7 @@ static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,
}
static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
GPR_TIMER_SCOPE("secure_endpoint.endpoint_write", 0);
unsigned i;
@@ -342,7 +342,7 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
return;
}
- grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb);
+ grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, arg);
}
static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error* why) {
diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc
index aff723ed04..d76d582638 100644
--- a/src/core/lib/security/transport/security_handshaker.cc
+++ b/src/core/lib/security/transport/security_handshaker.cc
@@ -259,7 +259,7 @@ static grpc_error* on_handshake_next_done_locked(
grpc_slice_buffer_reset_and_unref_internal(&h->outgoing);
grpc_slice_buffer_add(&h->outgoing, to_send);
grpc_endpoint_write(h->args->endpoint, &h->outgoing,
- &h->on_handshake_data_sent_to_peer);
+ &h->on_handshake_data_sent_to_peer, nullptr);
} else if (handshaker_result == nullptr) {
// There is nothing to send, but need to read from peer.
grpc_endpoint_read(h->args->endpoint, h->args->read_buffer,
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index d32281e076..826c0fb834 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -1972,7 +1972,7 @@ done:
return error;
done_with_error:
- /* reverse any mutations that occured */
+ /* reverse any mutations that occurred */
if (stream_op->send_initial_metadata) {
call->sent_initial_metadata = false;
grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
diff --git a/src/core/lib/surface/channel_init.h b/src/core/lib/surface/channel_init.h
index 6543796b4c..f01852473b 100644
--- a/src/core/lib/surface/channel_init.h
+++ b/src/core/lib/surface/channel_init.h
@@ -21,37 +21,11 @@
#include <grpc/support/port_platform.h>
-#include <limits.h>
-
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/transport.h"
-// Priority for channel registration functions to be used in
-// grpc_channel_init_register_stage(). The priority dictates the
-// order in which the registration functions run.
-//
-// When used to register a filter, the filter can either be appended or
-// prepended, thus dictating whether the filter goes at the top or bottom of
-// the stack. Higher priority functions can get closer to the top or bottom
-// of the stack than lower priority functions.
-enum {
- // Default level. Most of filters should use this level if their location in
- // the stack does not matter.
- GRPC_CHANNEL_INIT_PRIORITY_LOW = 0,
- // For filters that should be added after the group of filters with default
- // priority, such as auth filters.
- GRPC_CHANNEL_INIT_PRIORITY_MED = 10000,
- // For filters that need to be close to top or bottom, such as protocol-level
- // filters (client_authority, http-client, http-server).
- GRPC_CHANNEL_INIT_PRIORITY_HIGH = 20000,
- // For filters that need to be very close to the wire or surface, such as
- // stats filters (census).
- GRPC_CHANNEL_INIT_PRIORITY_VERY_HIGH = 30000,
- // For things that have to happen last, such as connected channel filter or
- // surface server filter. Consider as reserved for gRPC internals.
- GRPC_CHANNEL_INIT_PRIORITY_MAX = INT_MAX
-};
+#define GRPC_CHANNEL_INIT_BUILTIN_PRIORITY 10000
/// This module provides a way for plugins (and the grpc core library itself)
/// to register mutators for channel stacks.
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index 7da9e6b74c..0769d9e4f6 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -184,7 +184,7 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
typedef struct cq_vtable {
grpc_cq_completion_type cq_completion_type;
size_t data_size;
- void (*init)(void* data);
+ void (*init)(void* data, grpc_core::CQCallbackInterface* shutdown_callback);
void (*shutdown)(grpc_completion_queue* cq);
void (*destroy)(void* data);
bool (*begin_op)(grpc_completion_queue* cq, void* tag);
@@ -253,6 +253,23 @@ typedef struct cq_pluck_data {
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
} cq_pluck_data;
+typedef struct cq_callback_data {
+ /** No actual completed events queue, unlike other types */
+
+ /** Number of pending events (+1 if we're not shutdown) */
+ gpr_atm pending_events;
+
+ /** Counter of how many things have ever been queued on this completion queue
+ useful for avoiding locks to check the queue */
+ gpr_atm things_queued_ever;
+
+ /** 0 initially. 1 once we initiated shutdown */
+ bool shutdown_called;
+
+ /** A callback that gets invoked when the CQ completes shutdown */
+ grpc_core::CQCallbackInterface* shutdown_callback;
+} cq_callback_data;
+
/* Completion queue structure */
struct grpc_completion_queue {
/** Once owning_refs drops to zero, we will destroy the cq */
@@ -276,12 +293,21 @@ struct grpc_completion_queue {
/* Forward declarations */
static void cq_finish_shutdown_next(grpc_completion_queue* cq);
static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
+static void cq_finish_shutdown_callback(grpc_completion_queue* cq);
static void cq_shutdown_next(grpc_completion_queue* cq);
static void cq_shutdown_pluck(grpc_completion_queue* cq);
+static void cq_shutdown_callback(grpc_completion_queue* cq);
static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
-
+static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
+
+// A cq_end_op function is called when an operation on a given CQ with
+// a given tag has completed. The storage argument is a reference to the
+// space reserved for this completion as it is placed into the corresponding
+// queue. The done argument is a callback that will be invoked when it is
+// safe to free up that storage. The storage MUST NOT be freed until the
+// done callback is invoked.
static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
grpc_error* error,
void (*done)(void* done_arg,
@@ -294,16 +320,28 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage);
+static void cq_end_op_for_callback(grpc_completion_queue* cq, void* tag,
+ grpc_error* error,
+ void (*done)(void* done_arg,
+ grpc_cq_completion* storage),
+ void* done_arg, grpc_cq_completion* storage);
+
static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
void* reserved);
static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved);
-static void cq_init_next(void* data);
-static void cq_init_pluck(void* data);
+// Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
+static void cq_init_next(void* data,
+ grpc_core::CQCallbackInterface* shutdown_callback);
+static void cq_init_pluck(void* data,
+ grpc_core::CQCallbackInterface* shutdown_callback);
+static void cq_init_callback(void* data,
+ grpc_core::CQCallbackInterface* shutdown_callback);
static void cq_destroy_next(void* data);
static void cq_destroy_pluck(void* data);
+static void cq_destroy_callback(void* data);
/* Completion queue vtables based on the completion-type */
static const cq_vtable g_cq_vtable[] = {
@@ -315,6 +353,10 @@ static const cq_vtable g_cq_vtable[] = {
{GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr,
cq_pluck},
+ /* GRPC_CQ_CALLBACK */
+ {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback,
+ cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback,
+ cq_end_op_for_callback, nullptr, nullptr},
};
#define DATA_FROM_CQ(cq) ((void*)(cq + 1))
@@ -419,8 +461,8 @@ static long cq_event_queue_num_items(grpc_cq_event_queue* q) {
}
grpc_completion_queue* grpc_completion_queue_create_internal(
- grpc_cq_completion_type completion_type,
- grpc_cq_polling_type polling_type) {
+ grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
+ grpc_core::CQCallbackInterface* shutdown_callback) {
GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0);
grpc_completion_queue* cq;
@@ -448,15 +490,16 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
gpr_ref_init(&cq->owning_refs, 2);
poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
- vtable->init(DATA_FROM_CQ(cq));
+ vtable->init(DATA_FROM_CQ(cq), shutdown_callback);
GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
grpc_schedule_on_exec_ctx);
return cq;
}
-static void cq_init_next(void* ptr) {
- cq_next_data* cqd = static_cast<cq_next_data*>(ptr);
+static void cq_init_next(void* data,
+ grpc_core::CQCallbackInterface* shutdown_callback) {
+ cq_next_data* cqd = static_cast<cq_next_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->shutdown_called = false;
@@ -464,14 +507,15 @@ static void cq_init_next(void* ptr) {
cq_event_queue_init(&cqd->queue);
}
-static void cq_destroy_next(void* ptr) {
- cq_next_data* cqd = static_cast<cq_next_data*>(ptr);
+static void cq_destroy_next(void* data) {
+ cq_next_data* cqd = static_cast<cq_next_data*>(data);
GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0);
cq_event_queue_destroy(&cqd->queue);
}
-static void cq_init_pluck(void* ptr) {
- cq_pluck_data* cqd = static_cast<cq_pluck_data*>(ptr);
+static void cq_init_pluck(void* data,
+ grpc_core::CQCallbackInterface* shutdown_callback) {
+ cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->completed_tail = &cqd->completed_head;
@@ -482,11 +526,23 @@ static void cq_init_pluck(void* ptr) {
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
}
-static void cq_destroy_pluck(void* ptr) {
- cq_pluck_data* cqd = static_cast<cq_pluck_data*>(ptr);
+static void cq_destroy_pluck(void* data) {
+ cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
}
+static void cq_init_callback(
+ void* data, grpc_core::CQCallbackInterface* shutdown_callback) {
+ cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
+ /* Initial count is dropped by grpc_completion_queue_shutdown */
+ gpr_atm_no_barrier_store(&cqd->pending_events, 1);
+ cqd->shutdown_called = false;
+ gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
+ cqd->shutdown_callback = shutdown_callback;
+}
+
+static void cq_destroy_callback(void* data) {}
+
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
return cq->vtable->cq_completion_type;
}
@@ -596,6 +652,11 @@ static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag) {
return atm_inc_if_nonzero(&cqd->pending_events);
}
+static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag) {
+ cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
+ return atm_inc_if_nonzero(&cqd->pending_events);
+}
+
bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
#ifndef NDEBUG
gpr_mu_lock(cq->mu);
@@ -759,6 +820,48 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
GRPC_ERROR_UNREF(error);
}
+/* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */
+static void cq_end_op_for_callback(
+ grpc_completion_queue* cq, void* tag, grpc_error* error,
+ void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
+ grpc_cq_completion* storage) {
+ GPR_TIMER_SCOPE("cq_end_op_for_callback", 0);
+
+ cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
+ bool is_success = (error == GRPC_ERROR_NONE);
+
+ if (grpc_api_trace.enabled() ||
+ (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) {
+ const char* errmsg = grpc_error_string(error);
+ GRPC_API_TRACE(
+ "cq_end_op_for_callback(cq=%p, tag=%p, error=%s, "
+ "done=%p, done_arg=%p, storage=%p)",
+ 6, (cq, tag, errmsg, done, done_arg, storage));
+ if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) {
+ gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
+ }
+ }
+
+ // The callback-based CQ isn't really a queue at all and thus has no need
+ // for reserved storage. Invoke the done callback right away to release it.
+ done(done_arg, storage);
+
+ gpr_mu_lock(cq->mu);
+ cq_check_tag(cq, tag, false); /* Used in debug builds only */
+
+ gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ cq_finish_shutdown_callback(cq);
+ gpr_mu_unlock(cq->mu);
+ } else {
+ gpr_mu_unlock(cq->mu);
+ }
+
+ GRPC_ERROR_UNREF(error);
+
+ (static_cast<grpc_core::CQCallbackInterface*>(tag))->Run(is_success);
+}
+
void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage) {
@@ -1233,6 +1336,40 @@ static void cq_shutdown_pluck(grpc_completion_queue* cq) {
GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
}
+static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
+ cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
+ auto* callback = cqd->shutdown_callback;
+
+ GPR_ASSERT(cqd->shutdown_called);
+
+ cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
+ callback->Run(true);
+}
+
+static void cq_shutdown_callback(grpc_completion_queue* cq) {
+ cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
+
+ /* Need an extra ref for cq here because:
+ * We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
+ * Pollset shutdown decrements the cq ref count which can potentially destroy
+ * the cq (if that happens to be the last ref).
+ * Creating an extra ref here prevents the cq from getting destroyed while
+ * this function is still active */
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
+ gpr_mu_lock(cq->mu);
+ if (cqd->shutdown_called) {
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
+ return;
+ }
+ cqd->shutdown_called = true;
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ cq_finish_shutdown_callback(cq);
+ }
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
+}
+
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 84446a4d92..a7c524d8e8 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -25,6 +25,7 @@
#include <grpc/grpc.h>
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/iomgr/pollset.h"
/* These trace flags default to 1. The corresponding lines are only traced
@@ -47,6 +48,23 @@ typedef struct grpc_cq_completion {
uintptr_t next;
} grpc_cq_completion;
+/// For callback CQs, the tag that is passed in for an operation must
+/// actually be a pointer to an implementation of the following class.
+/// When the operation completes, the tag will be typecasted from void*
+/// to grpc_core::CQCallbackInterface* and then the Run method will be
+/// invoked on it. In practice, the language binding (e.g., C++ API
+/// implementation) is responsible for providing and using an implementation
+/// of this abstract base class.
+namespace grpc_core {
+class CQCallbackInterface {
+ public:
+ virtual ~CQCallbackInterface() {}
+ virtual void Run(bool) GRPC_ABSTRACT;
+
+ GRPC_ABSTRACT_BASE_CLASS
+};
+} // namespace grpc_core
+
#ifndef NDEBUG
void grpc_cq_internal_ref(grpc_completion_queue* cc, const char* reason,
const char* file, int line);
@@ -87,6 +105,7 @@ grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cc);
int grpc_get_cq_poll_num(grpc_completion_queue* cc);
grpc_completion_queue* grpc_completion_queue_create_internal(
- grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type);
+ grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
+ grpc_core::CQCallbackInterface* shutdown_callback);
#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */
diff --git a/src/core/lib/surface/completion_queue_factory.cc b/src/core/lib/surface/completion_queue_factory.cc
index 51c1183c5f..ed92dd7eba 100644
--- a/src/core/lib/surface/completion_queue_factory.cc
+++ b/src/core/lib/surface/completion_queue_factory.cc
@@ -30,8 +30,9 @@
static grpc_completion_queue* default_create(
const grpc_completion_queue_factory* factory,
const grpc_completion_queue_attributes* attr) {
- return grpc_completion_queue_create_internal(attr->cq_completion_type,
- attr->cq_polling_type);
+ return grpc_completion_queue_create_internal(
+ attr->cq_completion_type, attr->cq_polling_type,
+ static_cast<grpc_core::CQCallbackInterface*>(attr->cq_shutdown_cb));
}
static grpc_completion_queue_factory_vtable default_vtable = {default_create};
@@ -60,14 +61,22 @@ const grpc_completion_queue_factory* grpc_completion_queue_factory_lookup(
grpc_completion_queue* grpc_completion_queue_create_for_next(void* reserved) {
GPR_ASSERT(!reserved);
grpc_completion_queue_attributes attr = {1, GRPC_CQ_NEXT,
- GRPC_CQ_DEFAULT_POLLING};
+ GRPC_CQ_DEFAULT_POLLING, nullptr};
return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr);
}
grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) {
GPR_ASSERT(!reserved);
grpc_completion_queue_attributes attr = {1, GRPC_CQ_PLUCK,
- GRPC_CQ_DEFAULT_POLLING};
+ GRPC_CQ_DEFAULT_POLLING, nullptr};
+ return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr);
+}
+
+grpc_completion_queue* grpc_completion_queue_create_for_callback(
+ void* shutdown_callback, void* reserved) {
+ GPR_ASSERT(!reserved);
+ grpc_completion_queue_attributes attr = {
+ 2, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback};
return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr);
}
diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc
index 7807b261d4..0ad82fed99 100644
--- a/src/core/lib/surface/init.cc
+++ b/src/core/lib/surface/init.cc
@@ -70,6 +70,11 @@ static void do_basic_init(void) {
g_initializations = 0;
}
+static bool append_filter(grpc_channel_stack_builder* builder, void* arg) {
+ return grpc_channel_stack_builder_append_filter(
+ builder, static_cast<const grpc_channel_filter*>(arg), nullptr, nullptr);
+}
+
static bool prepend_filter(grpc_channel_stack_builder* builder, void* arg) {
return grpc_channel_stack_builder_prepend_filter(
builder, static_cast<const grpc_channel_filter*>(arg), nullptr, nullptr);
@@ -77,20 +82,19 @@ static bool prepend_filter(grpc_channel_stack_builder* builder, void* arg) {
static void register_builtin_channel_init() {
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_MAX,
- grpc_append_connected_filter, nullptr);
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ grpc_add_connected_filter, nullptr);
grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_MAX,
- grpc_append_connected_filter, nullptr);
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ grpc_add_connected_filter, nullptr);
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_MAX,
- grpc_append_connected_filter, nullptr);
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ grpc_add_connected_filter, nullptr);
grpc_channel_init_register_stage(GRPC_CLIENT_LAME_CHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_MAX,
- prepend_filter, (void*)&grpc_lame_filter);
- grpc_channel_init_register_stage(
- GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_MAX, prepend_filter,
- (void*)&grpc_server_top_filter);
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ append_filter, (void*)&grpc_lame_filter);
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, prepend_filter,
+ (void*)&grpc_server_top_filter);
}
typedef struct grpc_plugin {
diff --git a/src/core/lib/surface/init.h b/src/core/lib/surface/init.h
index 9353208332..193f51447d 100644
--- a/src/core/lib/surface/init.h
+++ b/src/core/lib/surface/init.h
@@ -22,6 +22,5 @@
void grpc_register_security_filters(void);
void grpc_security_pre_init(void);
void grpc_security_init(void);
-int grpc_is_initialized(void);
#endif /* GRPC_CORE_LIB_SURFACE_INIT_H */
diff --git a/src/core/lib/surface/init_secure.cc b/src/core/lib/surface/init_secure.cc
index 8058aaa804..28c6f7b121 100644
--- a/src/core/lib/surface/init_secure.cc
+++ b/src/core/lib/surface/init_secure.cc
@@ -67,17 +67,14 @@ static bool maybe_prepend_server_auth_filter(
}
void grpc_register_security_filters(void) {
- // Register the auth client with a medium priority to allow the authority
+ // Register the auth client with a priority < INT_MAX to allow the authority
// filter -on which the auth filter depends- to be higher on the channel
// stack.
- grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_MED,
+ grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, INT_MAX - 1,
maybe_prepend_client_auth_filter, nullptr);
- grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_MED,
+ grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX - 1,
maybe_prepend_client_auth_filter, nullptr);
- grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_MED,
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
maybe_prepend_server_auth_filter, nullptr);
}
diff --git a/src/core/lib/surface/version.cc b/src/core/lib/surface/version.cc
index e92fe2c5a1..a44f9acdc3 100644
--- a/src/core/lib/surface/version.cc
+++ b/src/core/lib/surface/version.cc
@@ -25,4 +25,4 @@
const char* grpc_version_string(void) { return "6.0.0-dev"; }
-const char* grpc_g_stands_for(void) { return "glider"; }
+const char* grpc_g_stands_for(void) { return "gao"; }
diff --git a/src/core/lib/transport/service_config.cc b/src/core/lib/transport/service_config.cc
index e1a55d98ab..405e336028 100644
--- a/src/core/lib/transport/service_config.cc
+++ b/src/core/lib/transport/service_config.cc
@@ -65,8 +65,8 @@ const char* ServiceConfig::GetLoadBalancingPolicyName() const {
return lb_policy_name;
}
-size_t ServiceConfig::CountNamesInMethodConfig(grpc_json* json) {
- size_t num_names = 0;
+int ServiceConfig::CountNamesInMethodConfig(grpc_json* json) {
+ int num_names = 0;
for (grpc_json* field = json->child; field != nullptr; field = field->next) {
if (field->key != nullptr && strcmp(field->key, "name") == 0) {
if (field->type != GRPC_JSON_ARRAY) return -1;
diff --git a/src/core/lib/transport/service_config.h b/src/core/lib/transport/service_config.h
index a65b267d46..2c0dd75845 100644
--- a/src/core/lib/transport/service_config.h
+++ b/src/core/lib/transport/service_config.h
@@ -103,7 +103,7 @@ class ServiceConfig {
ServiceConfig(UniquePtr<char> json_string, grpc_json* json_tree);
// Returns the number of names specified in the method config \a json.
- static size_t CountNamesInMethodConfig(grpc_json* json);
+ static int CountNamesInMethodConfig(grpc_json* json);
// Returns a path string for the JSON name object specified by \a json.
// Returns null on error.
@@ -188,9 +188,9 @@ ServiceConfig::CreateMethodConfigTable(CreateValue<T> create_value) {
// Find number of entries.
for (grpc_json* method = field->child; method != nullptr;
method = method->next) {
- size_t count = CountNamesInMethodConfig(method);
+ int count = CountNamesInMethodConfig(method);
if (count <= 0) return nullptr;
- num_entries += count;
+ num_entries += static_cast<size_t>(count);
}
// Populate method config table entries.
entries = static_cast<typename SliceHashTable<RefCountedPtr<T>>::Entry*>(