aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/lib/gprpp/thd.h95
-rw-r--r--src/core/lib/gprpp/thd_posix.cc221
-rw-r--r--src/core/lib/gprpp/thd_windows.cc156
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc21
-rw-r--r--src/core/lib/iomgr/executor.cc8
-rw-r--r--src/core/lib/iomgr/timer_manager.cc4
-rw-r--r--src/core/lib/profiling/basic_timers.cc6
-rw-r--r--src/core/tsi/alts_transport_security.cc1
-rw-r--r--test/core/end2end/fixtures/http_proxy_fixture.cc4
-rw-r--r--test/core/end2end/fixtures/proxy.cc4
-rw-r--r--test/core/gpr/arena_test.cc3
-rw-r--r--test/core/gpr/cpu_test.cc4
-rw-r--r--test/core/gpr/mpscq_test.cc8
-rw-r--r--test/core/gpr/spinlock_test.cc4
-rw-r--r--test/core/gpr/sync_test.cc6
-rw-r--r--test/core/gpr/tls_test.cc3
-rw-r--r--test/core/gprpp/thd_test.cc6
-rw-r--r--test/core/iomgr/combiner_test.cc5
-rw-r--r--test/core/iomgr/ev_epollsig_linux_test.cc3
-rw-r--r--test/core/iomgr/resolve_address_posix_test.cc3
-rw-r--r--test/core/surface/completion_queue_threading_test.cc7
-rw-r--r--test/core/surface/concurrent_connectivity_test.cc14
22 files changed, 330 insertions, 256 deletions
diff --git a/src/core/lib/gprpp/thd.h b/src/core/lib/gprpp/thd.h
index f45e78e7f6..2f86893519 100644
--- a/src/core/lib/gprpp/thd.h
+++ b/src/core/lib/gprpp/thd.h
@@ -28,34 +28,105 @@
#include <grpc/support/thd_id.h>
#include <grpc/support/time.h>
+#include "src/core/lib/gprpp/abstract.h"
+#include "src/core/lib/gprpp/memory.h"
+
namespace grpc_core {
+namespace internal {
+
+/// Base class for platform-specific thread-state
+class ThreadInternalsInterface {
+ public:
+ virtual ~ThreadInternalsInterface() {}
+ virtual void Start() GRPC_ABSTRACT;
+ virtual void Join() GRPC_ABSTRACT;
+ GRPC_ABSTRACT_BASE_CLASS
+};
+} // namespace internal
class Thread {
public:
/// Default constructor only to allow use in structs that lack constructors
/// Does not produce a validly-constructed thread; must later
/// use placement new to construct a real thread. Does not init mu_ and cv_
- Thread() : real_(false), alive_(false), started_(false), joined_(false) {}
+ Thread() : state_(FAKE), impl_(nullptr) {}
+ /// Normal constructor to create a thread with name \a thd_name,
+ /// which will execute a thread based on function \a thd_body
+ /// with argument \a arg once it is started.
+ /// The optional \a success argument indicates whether the thread
+ /// is successfully created.
Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
bool* success = nullptr);
- ~Thread();
- void Start();
- void Join();
+ /// Move constructor for thread. After this is called, the other thread
+ /// no longer represents a living thread object
+ Thread(Thread&& other) : state_(other.state_), impl_(other.impl_) {
+ other.state_ = MOVED;
+ other.impl_ = nullptr;
+ }
+
+ /// Move assignment operator for thread. After this is called, the other
+ /// thread no longer represents a living thread object. Not allowed if this
+ /// thread actually exists
+ Thread& operator=(Thread&& other) {
+ if (this != &other) {
+ // TODO(vjpai): if we can be sure that all Thread's are actually
+ // constructed, then we should assert GPR_ASSERT(impl_ == nullptr) here.
+ // However, as long as threads come in structures that are
+ // allocated via gpr_malloc, this will not be the case, so we cannot
+ // assert it for the time being.
+ state_ = other.state_;
+ impl_ = other.impl_;
+ other.state_ = MOVED;
+ other.impl_ = nullptr;
+ }
+ return *this;
+ }
+
+ /// The destructor is strictly optional; either the thread never came to life
+ /// and the constructor itself killed it or it has already been joined and
+ /// the Join function kills it. The destructor shouldn't have to do anything.
+ ~Thread() { GPR_ASSERT(impl_ == nullptr); }
+
+ void Start() {
+ if (impl_ != nullptr) {
+ GPR_ASSERT(state_ == ALIVE);
+ state_ = STARTED;
+ impl_->Start();
+ } else {
+ GPR_ASSERT(state_ == FAILED);
+ }
+ };
+ void Join() {
+ if (impl_ != nullptr) {
+ GPR_ASSERT(state_ == STARTED);
+ impl_->Join();
+ grpc_core::Delete(impl_);
+ state_ = DONE;
+ impl_ = nullptr;
+ } else {
+ GPR_ASSERT(state_ == FAILED);
+ }
+ };
static void Init();
static bool AwaitAll(gpr_timespec deadline);
private:
- gpr_mu mu_;
- gpr_cv ready_;
-
- gpr_thd_id id_;
- bool real_;
- bool alive_;
- bool started_;
- bool joined_;
+ Thread(const Thread&) = delete;
+ Thread& operator=(const Thread&) = delete;
+
+ /// The thread states are as follows:
+ /// FAKE -- just a dummy placeholder Thread created by the default constructor
+ /// ALIVE -- an actual thread of control exists associated with this thread
+ /// STARTED -- the thread of control has been started
+ /// DONE -- the thread of control has completed and been joined
+ /// FAILED -- the thread of control never came alive
+ /// MOVED -- contents were moved out and we're no longer tracking them
+ enum ThreadState { FAKE, ALIVE, STARTED, DONE, FAILED, MOVED };
+ ThreadState state_;
+ internal::ThreadInternalsInterface* impl_;
};
} // namespace grpc_core
diff --git a/src/core/lib/gprpp/thd_posix.cc b/src/core/lib/gprpp/thd_posix.cc
index 4ded4d3fd5..28e47f1aa9 100644
--- a/src/core/lib/gprpp/thd_posix.cc
+++ b/src/core/lib/gprpp/thd_posix.cc
@@ -34,6 +34,7 @@
#include "src/core/lib/gpr/fork.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/memory.h"
namespace grpc_core {
namespace {
@@ -42,131 +43,142 @@ gpr_cv g_cv;
int g_thread_count;
int g_awaiting_threads;
+class ThreadInternalsPosix;
struct thd_arg {
- Thread* thread;
+ ThreadInternalsPosix* thread;
void (*body)(void* arg); /* body of a thread */
void* arg; /* argument to a thread */
const char* name; /* name of thread. Can be nullptr. */
};
-/*****************************************
- * Only used when fork support is enabled
- */
+class ThreadInternalsPosix
+ : public grpc_core::internal::ThreadInternalsInterface {
+ public:
+ ThreadInternalsPosix(const char* thd_name, void (*thd_body)(void* arg),
+ void* arg, bool* success)
+ : started_(false) {
+ gpr_mu_init(&mu_);
+ gpr_cv_init(&ready_);
+ pthread_attr_t attr;
+ /* don't use gpr_malloc as we may cause an infinite recursion with
+ * the profiling code */
+ thd_arg* info = static_cast<thd_arg*>(malloc(sizeof(*info)));
+ GPR_ASSERT(info != nullptr);
+ info->thread = this;
+ info->body = thd_body;
+ info->arg = arg;
+ info->name = thd_name;
+ inc_thd_count();
+
+ GPR_ASSERT(pthread_attr_init(&attr) == 0);
+ GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) ==
+ 0);
+
+ *success =
+ (pthread_create(&pthread_id_, &attr,
+ [](void* v) -> void* {
+ thd_arg arg = *static_cast<thd_arg*>(v);
+ free(v);
+ if (arg.name != nullptr) {
+#if GPR_APPLE_PTHREAD_NAME
+ /* Apple supports 64 characters, and will
+ * truncate if it's longer. */
+ pthread_setname_np(arg.name);
+#elif GPR_LINUX_PTHREAD_NAME
+ /* Linux supports 16 characters max, and will
+ * error if it's longer. */
+ char buf[16];
+ size_t buf_len = GPR_ARRAY_SIZE(buf) - 1;
+ strncpy(buf, arg.name, buf_len);
+ buf[buf_len] = '\0';
+ pthread_setname_np(pthread_self(), buf);
+#endif // GPR_APPLE_PTHREAD_NAME
+ }
+
+ gpr_mu_lock(&arg.thread->mu_);
+ while (!arg.thread->started_) {
+ gpr_cv_wait(&arg.thread->ready_, &arg.thread->mu_,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ }
+ gpr_mu_unlock(&arg.thread->mu_);
+
+ (*arg.body)(arg.arg);
+ dec_thd_count();
+ return nullptr;
+ },
+ info) == 0);
+
+ GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
+
+ if (!success) {
+ /* don't use gpr_free, as this was allocated using malloc (see above) */
+ free(info);
+ dec_thd_count();
+ }
+ };
-void inc_thd_count() {
- if (grpc_fork_support_enabled()) {
- gpr_mu_lock(&g_mu);
- g_thread_count++;
- gpr_mu_unlock(&g_mu);
+ ~ThreadInternalsPosix() override {
+ gpr_mu_destroy(&mu_);
+ gpr_cv_destroy(&ready_);
}
-}
-void dec_thd_count() {
- if (grpc_fork_support_enabled()) {
- gpr_mu_lock(&g_mu);
- g_thread_count--;
- if (g_awaiting_threads && g_thread_count == 0) {
- gpr_cv_signal(&g_cv);
- }
- gpr_mu_unlock(&g_mu);
+ void Start() override {
+ gpr_mu_lock(&mu_);
+ started_ = true;
+ gpr_cv_signal(&ready_);
+ gpr_mu_unlock(&mu_);
}
-}
-} // namespace
+ void Join() override { pthread_join(pthread_id_, nullptr); }
-Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
- bool* success)
- : real_(true), alive_(false), started_(false), joined_(false) {
- gpr_mu_init(&mu_);
- gpr_cv_init(&ready_);
- pthread_attr_t attr;
- /* don't use gpr_malloc as we may cause an infinite recursion with
- * the profiling code */
- thd_arg* a = static_cast<thd_arg*>(malloc(sizeof(*a)));
- GPR_ASSERT(a != nullptr);
- a->thread = this;
- a->body = thd_body;
- a->arg = arg;
- a->name = thd_name;
- inc_thd_count();
-
- GPR_ASSERT(pthread_attr_init(&attr) == 0);
- GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == 0);
-
- pthread_t p;
- alive_ = (pthread_create(&p, &attr,
- [](void* v) -> void* {
- thd_arg a = *static_cast<thd_arg*>(v);
- free(v);
- if (a.name != nullptr) {
-#if GPR_APPLE_PTHREAD_NAME
- /* Apple supports 64 characters, and will
- * truncate if it's longer. */
- pthread_setname_np(a.name);
-#elif GPR_LINUX_PTHREAD_NAME
- /* Linux supports 16 characters max, and will
- * error if it's longer. */
- char buf[16];
- size_t buf_len = GPR_ARRAY_SIZE(buf) - 1;
- strncpy(buf, a.name, buf_len);
- buf[buf_len] = '\0';
- pthread_setname_np(pthread_self(), buf);
-#endif // GPR_APPLE_PTHREAD_NAME
- }
+ private:
+ /*****************************************
+ * Only used when fork support is enabled
+ */
- gpr_mu_lock(&a.thread->mu_);
- if (!a.thread->started_) {
- gpr_cv_wait(&a.thread->ready_, &a.thread->mu_,
- gpr_inf_future(GPR_CLOCK_MONOTONIC));
- }
- gpr_mu_unlock(&a.thread->mu_);
-
- (*a.body)(a.arg);
- dec_thd_count();
- return nullptr;
- },
- a) == 0);
+ static void inc_thd_count() {
+ if (grpc_fork_support_enabled()) {
+ gpr_mu_lock(&g_mu);
+ g_thread_count++;
+ gpr_mu_unlock(&g_mu);
+ }
+ }
- if (success != nullptr) {
- *success = alive_;
+ static void dec_thd_count() {
+ if (grpc_fork_support_enabled()) {
+ gpr_mu_lock(&g_mu);
+ g_thread_count--;
+ if (g_awaiting_threads && g_thread_count == 0) {
+ gpr_cv_signal(&g_cv);
+ }
+ gpr_mu_unlock(&g_mu);
+ }
}
- id_ = gpr_thd_id(p);
- GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
+ gpr_mu mu_;
+ gpr_cv ready_;
+ bool started_;
+ pthread_t pthread_id_;
+};
- if (!alive_) {
- /* don't use gpr_free, as this was allocated using malloc (see above) */
- free(a);
- dec_thd_count();
- }
-}
+} // namespace
-Thread::~Thread() {
- if (!alive_) {
- // This thread never existed, so nothing to do
+Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
+ bool* success) {
+ bool outcome;
+ impl_ =
+ grpc_core::New<ThreadInternalsPosix>(thd_name, thd_body, arg, &outcome);
+ if (outcome) {
+ state_ = ALIVE;
} else {
- GPR_ASSERT(joined_);
- }
- if (real_) {
- gpr_mu_destroy(&mu_);
- gpr_cv_destroy(&ready_);
+ state_ = FAILED;
+ grpc_core::Delete(impl_);
+ impl_ = nullptr;
}
-}
-void Thread::Start() {
- gpr_mu_lock(&mu_);
- if (alive_) {
- started_ = true;
- gpr_cv_signal(&ready_);
- }
- gpr_mu_unlock(&mu_);
-}
-
-void Thread::Join() {
- if (alive_) {
- pthread_join(pthread_t(id_), nullptr);
+ if (success != nullptr) {
+ *success = outcome;
}
- joined_ = true;
}
void Thread::Init() {
@@ -180,7 +192,8 @@ bool Thread::AwaitAll(gpr_timespec deadline) {
gpr_mu_lock(&g_mu);
g_awaiting_threads = 1;
int res = 0;
- if (g_thread_count > 0) {
+ while ((g_thread_count > 0) &&
+ (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0)) {
res = gpr_cv_wait(&g_cv, &g_mu, deadline);
}
g_awaiting_threads = 0;
diff --git a/src/core/lib/gprpp/thd_windows.cc b/src/core/lib/gprpp/thd_windows.cc
index efbed30ac6..e13c2f63d1 100644
--- a/src/core/lib/gprpp/thd_windows.cc
+++ b/src/core/lib/gprpp/thd_windows.cc
@@ -29,6 +29,8 @@
#include <grpc/support/thd_id.h>
#include <string.h>
+#include "src/core/lib/gprpp/memory.h"
+
#if defined(_MSC_VER)
#define thread_local __declspec(thread)
#define WIN_LAMBDA
@@ -40,8 +42,9 @@
#endif
namespace {
+class ThreadInternalsWindows;
struct thd_info {
- grpc_core::Thread* thread;
+ ThreadInternalsWindows* thread;
void (*body)(void* arg); /* body of a thread */
void* arg; /* argument to a thread */
HANDLE join_event; /* the join event */
@@ -54,6 +57,77 @@ void destroy_thread(struct thd_info* t) {
CloseHandle(t->join_event);
gpr_free(t);
}
+
+class ThreadInternalsWindows
+ : public grpc_core::internal::ThreadInternalsInterface {
+ public:
+ ThreadInternalsWindows(void (*thd_body)(void* arg), void* arg,
+ bool* success) {
+ gpr_mu_init(&mu_);
+ gpr_cv_init(&ready_);
+
+ HANDLE handle;
+ info_ = (struct thd_info*)gpr_malloc(sizeof(*info_));
+ info->thread = this;
+ info->body = thd_body;
+ info->arg = arg;
+
+ info->join_event = CreateEvent(nullptr, FALSE, FALSE, nullptr);
+ if (info->join_event == nullptr) {
+ gpr_free(info_);
+ *success = false;
+ } else {
+ handle = CreateThread(
+ nullptr, 64 * 1024,
+ [](void* v) WIN_LAMBDA -> DWORD {
+ g_thd_info = static_cast<thd_info*>(v);
+ gpr_mu_lock(&g_thd_info->thread->mu_);
+ while (!g_thd_info->thread->started_) {
+ gpr_cv_wait(&g_thd_info->thread->ready_, &g_thd_info->thread->mu_,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ }
+ gpr_mu_unlock(&g_thd_info->thread->mu_);
+ g_thd_info->body(g_thd_info->arg);
+ BOOL ret = SetEvent(g_thd_info->join_event);
+ GPR_ASSERT(ret);
+ return 0;
+ },
+ info, 0, nullptr);
+ if (handle == nullptr) {
+ destroy_thread(info_);
+ *success_ = false;
+ } else {
+ CloseHandle(handle);
+ *success = true;
+ }
+ }
+ }
+
+ ~ThreadInternalsWindows() override {
+ gpr_mu_destroy(&mu_);
+ gpr_cv_destroy(&ready_);
+ }
+
+ void Start() override {
+ gpr_mu_lock(&mu_);
+ started_ = true;
+ gpr_cv_signal(&ready_);
+ gpr_mu_unlock(&mu_);
+ }
+
+ void Join() override {
+ DWORD ret = WaitForSingleObject(info_->join_event, INFINITE);
+ GPR_ASSERT(ret == WAIT_OBJECT_0);
+ destroy_thread(info_);
+ }
+
+ private:
+ gpr_mu mu_;
+ gpr_cv ready_;
+ bool started_;
+ thd_info* info_;
+};
+
} // namespace
namespace grpc_core {
@@ -66,82 +140,22 @@ bool Thread::AwaitAll(gpr_timespec deadline) {
}
Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
- bool* success)
- : real_(true), alive_(false), started_(false), joined_(false) {
- gpr_mu_init(&mu_);
- gpr_cv_init(&ready_);
-
- HANDLE handle;
- struct thd_info* info = (struct thd_info*)gpr_malloc(sizeof(*info));
- info->thread = this;
- info->body = thd_body;
- info->arg = arg;
-
- info->join_event = CreateEvent(nullptr, FALSE, FALSE, nullptr);
- if (info->join_event == nullptr) {
- gpr_free(info);
- alive_ = false;
+ bool* success) {
+ bool outcome;
+ impl_ = grpc_core::New<ThreadInternalsWindows>(thd_body, arg, &outcome);
+ if (outcome) {
+ state_ = ALIVE;
} else {
- handle = CreateThread(nullptr, 64 * 1024,
- [](void* v) WIN_LAMBDA -> DWORD {
- g_thd_info = static_cast<thd_info*>(v);
- gpr_mu_lock(&g_thd_info->thread->mu_);
- if (!g_thd_info->thread->started_) {
- gpr_cv_wait(&g_thd_info->thread->ready_,
- &g_thd_info->thread->mu_,
- gpr_inf_future(GPR_CLOCK_MONOTONIC));
- }
- gpr_mu_unlock(&g_thd_info->thread->mu_);
- g_thd_info->body(g_thd_info->arg);
- BOOL ret = SetEvent(g_thd_info->join_event);
- GPR_ASSERT(ret);
- return 0;
- },
- info, 0, nullptr);
- if (handle == nullptr) {
- destroy_thread(info);
- alive_ = false;
- } else {
- id_ = (gpr_thd_id)info;
- CloseHandle(handle);
- alive_ = true;
- }
- }
- if (success != nullptr) {
- *success = alive_;
- }
-}
-
-Thread::~Thread() {
- if (!alive_) {
- // This thread never existed, so nothing to do
- } else {
- GPR_ASSERT(joined_);
- }
- if (real_) {
- gpr_mu_destroy(&mu_);
- gpr_cv_destroy(&ready_);
+ state_ = FAILED;
+ grpc_core::Delete(impl_);
+ impl_ = nullptr;
}
-}
-void Thread::Start() {
- gpr_mu_lock(&mu_);
- if (alive_) {
- started_ = true;
- gpr_cv_signal(&ready_);
+ if (success != nullptr) {
+ *success = outcome;
}
- gpr_mu_unlock(&mu_);
}
-void Thread::Join() {
- if (alive_) {
- thd_info* info = (thd_info*)id_;
- DWORD ret = WaitForSingleObject(info->join_event, INFINITE);
- GPR_ASSERT(ret == WAIT_OBJECT_0);
- destroy_thread(info);
- }
- joined_ = true;
-}
} // namespace grpc_core
gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)g_thd_info; }
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index 4fc8ce9ece..6120f9f44b 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -31,7 +31,6 @@
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
-#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -259,7 +258,9 @@ typedef struct poll_args {
grpc_core::Thread poller_thd;
gpr_cv trigger;
int trigger_set;
+ bool harvestable;
gpr_cv harvest;
+ bool joinable;
gpr_cv join;
struct pollfd* fds;
nfds_t nfds;
@@ -1372,6 +1373,8 @@ static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) {
gpr_cv_init(&pargs->trigger);
gpr_cv_init(&pargs->harvest);
gpr_cv_init(&pargs->join);
+ pargs->harvestable = false;
+ pargs->joinable = false;
pargs->fds = fds;
pargs->nfds = count;
pargs->next = nullptr;
@@ -1380,7 +1383,7 @@ static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) {
init_result(pargs);
cache_poller_locked(pargs);
gpr_ref(&g_cvfds.pollcount);
- new (&pargs->poller_thd) grpc_core::Thread("grpc_poller", &run_poll, pargs);
+ pargs->poller_thd = grpc_core::Thread("grpc_poller", &run_poll, pargs);
pargs->poller_thd.Start();
return pargs;
}
@@ -1464,10 +1467,13 @@ static void cache_harvest_locked() {
if (poll_cache.dead_pollers) {
poll_cache.dead_pollers->prev = nullptr;
}
+ args->harvestable = true;
gpr_cv_signal(&args->harvest);
- gpr_cv_wait(&args->join, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ while (!args->joinable) {
+ gpr_cv_wait(&args->join, &g_cvfds.mu,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ }
args->poller_thd.Join();
- args->poller_thd.~Thread();
gpr_free(args);
}
}
@@ -1533,8 +1539,11 @@ static void run_poll(void* args) {
if (gpr_unref(&g_cvfds.pollcount)) {
gpr_cv_signal(&g_cvfds.shutdown_cv);
}
- gpr_cv_wait(&pargs->harvest, &g_cvfds.mu,
- gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ while (!pargs->harvestable) {
+ gpr_cv_wait(&pargs->harvest, &g_cvfds.mu,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ }
+ pargs->joinable = true;
gpr_cv_signal(&pargs->join);
gpr_mu_unlock(&g_cvfds.mu);
}
diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc
index 74e530e898..b017db53f8 100644
--- a/src/core/lib/iomgr/executor.cc
+++ b/src/core/lib/iomgr/executor.cc
@@ -21,7 +21,6 @@
#include "src/core/lib/iomgr/executor.h"
#include <string.h>
-#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/cpu.h>
@@ -102,11 +101,11 @@ void grpc_executor_set_threading(bool threading) {
for (size_t i = 0; i < g_max_threads; i++) {
gpr_mu_init(&g_thread_state[i].mu);
gpr_cv_init(&g_thread_state[i].cv);
- new (&g_thread_state[i].thd) grpc_core::Thread();
+ g_thread_state[i].thd = grpc_core::Thread();
g_thread_state[i].elems = GRPC_CLOSURE_LIST_INIT;
}
- new (&g_thread_state[0].thd)
+ g_thread_state[0].thd =
grpc_core::Thread("grpc_executor", executor_thread, &g_thread_state[0]);
g_thread_state[0].thd.Start();
} else {
@@ -126,7 +125,6 @@ void grpc_executor_set_threading(bool threading) {
}
gpr_atm_no_barrier_store(&g_cur_threads, 0);
for (size_t i = 0; i < g_max_threads; i++) {
- g_thread_state[i].thd.~Thread();
gpr_mu_destroy(&g_thread_state[i].mu);
gpr_cv_destroy(&g_thread_state[i].cv);
run_closures(g_thread_state[i].elems);
@@ -266,7 +264,7 @@ static void executor_push(grpc_closure* closure, grpc_error* error,
if (cur_thread_count < g_max_threads) {
gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
- new (&g_thread_state[cur_thread_count].thd)
+ g_thread_state[cur_thread_count].thd =
grpc_core::Thread("grpc_executor", executor_thread,
&g_thread_state[cur_thread_count]);
g_thread_state[cur_thread_count].thd.Start();
diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc
index 7efbaa8364..94f288af27 100644
--- a/src/core/lib/iomgr/timer_manager.cc
+++ b/src/core/lib/iomgr/timer_manager.cc
@@ -19,7 +19,6 @@
#include <grpc/support/port_platform.h>
#include <inttypes.h>
-#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -69,7 +68,6 @@ static void gc_completed_threads(void) {
gpr_mu_unlock(&g_mu);
while (to_gc != nullptr) {
to_gc->thd.Join();
- to_gc->thd.~Thread();
completed_thread* next = to_gc->next;
gpr_free(to_gc);
to_gc = next;
@@ -88,7 +86,7 @@ static void start_timer_thread_and_unlock(void) {
}
completed_thread* ct =
static_cast<completed_thread*>(gpr_malloc(sizeof(*ct)));
- new (&ct->thd) grpc_core::Thread("grpc_global_timer", timer_thread, ct);
+ ct->thd = grpc_core::Thread("grpc_global_timer", timer_thread, ct);
ct->thd.Start();
}
diff --git a/src/core/lib/profiling/basic_timers.cc b/src/core/lib/profiling/basic_timers.cc
index 97646d1000..43384fd0ca 100644
--- a/src/core/lib/profiling/basic_timers.cc
+++ b/src/core/lib/profiling/basic_timers.cc
@@ -183,7 +183,7 @@ static void finish_writing(void) {
pthread_cond_signal(&g_cv);
pthread_mutex_unlock(&g_mu);
g_writing_thread->Join();
- delete g_writing_thread;
+ grpc_core::Delete(g_writing_thread);
gpr_log(GPR_INFO, "flushing logs");
@@ -202,8 +202,8 @@ void gpr_timers_set_log_filename(const char* filename) {
}
static void init_output() {
- g_writing_thread =
- new grpc_core::Thread("timer_output_thread", writing_thread, nullptr);
+ g_writing_thread = grpc_core::New<grpc_core::Thread>("timer_output_thread",
+ writing_thread, nullptr);
atexit(finish_writing);
}
diff --git a/src/core/tsi/alts_transport_security.cc b/src/core/tsi/alts_transport_security.cc
index 67798718b7..2fd408103b 100644
--- a/src/core/tsi/alts_transport_security.cc
+++ b/src/core/tsi/alts_transport_security.cc
@@ -57,7 +57,6 @@ void grpc_tsi_alts_shutdown() {
grpc_completion_queue_destroy(g_alts_resource.cq);
grpc_channel_destroy(g_alts_resource.channel);
g_alts_resource.thread.Join();
- g_alts_resource.thread.~Thread();
}
gpr_cv_destroy(&g_alts_resource.cv);
gpr_mu_destroy(&g_alts_resource.mu);
diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc
index d074fdaa52..58353376f3 100644
--- a/test/core/end2end/fixtures/http_proxy_fixture.cc
+++ b/test/core/end2end/fixtures/http_proxy_fixture.cc
@@ -21,7 +21,6 @@
#include "src/core/lib/iomgr/sockaddr.h"
#include <string.h>
-#include <new>
#include <grpc/grpc.h>
#include <grpc/slice_buffer.h>
@@ -551,7 +550,7 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(
grpc_tcp_server_start(proxy->server, &proxy->pollset, 1, on_accept, proxy);
// Start proxy thread.
- new (&proxy->thd) grpc_core::Thread("grpc_http_proxy", thread_main, proxy);
+ proxy->thd = grpc_core::Thread("grpc_http_proxy", thread_main, proxy);
proxy->thd.Start();
return proxy;
}
@@ -566,7 +565,6 @@ void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) {
gpr_unref(&proxy->users); // Signal proxy thread to shutdown.
grpc_core::ExecCtx exec_ctx;
proxy->thd.Join();
- proxy->thd.~Thread();
grpc_tcp_server_shutdown_listeners(proxy->server);
grpc_tcp_server_unref(proxy->server);
gpr_free(proxy->proxy_name);
diff --git a/test/core/end2end/fixtures/proxy.cc b/test/core/end2end/fixtures/proxy.cc
index 5b4f3b9236..042c858b4c 100644
--- a/test/core/end2end/fixtures/proxy.cc
+++ b/test/core/end2end/fixtures/proxy.cc
@@ -19,7 +19,6 @@
#include "test/core/end2end/fixtures/proxy.h"
#include <string.h>
-#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -98,7 +97,7 @@ grpc_end2end_proxy* grpc_end2end_proxy_create(const grpc_end2end_proxy_def* def,
grpc_server_start(proxy->server);
grpc_call_details_init(&proxy->new_call_details);
- new (&proxy->thd) grpc_core::Thread("grpc_end2end_proxy", thread_main, proxy);
+ proxy->thd = grpc_core::Thread("grpc_end2end_proxy", thread_main, proxy);
proxy->thd.Start();
request_call(proxy);
@@ -123,7 +122,6 @@ void grpc_end2end_proxy_destroy(grpc_end2end_proxy* proxy) {
grpc_server_shutdown_and_notify(proxy->server, proxy->cq,
new_closure(shutdown_complete, proxy));
proxy->thd.Join();
- proxy->thd.~Thread();
gpr_free(proxy->proxy_port);
gpr_free(proxy->server_port);
grpc_server_destroy(proxy->server);
diff --git a/test/core/gpr/arena_test.cc b/test/core/gpr/arena_test.cc
index b00c014cea..111414ea3e 100644
--- a/test/core/gpr/arena_test.cc
+++ b/test/core/gpr/arena_test.cc
@@ -20,7 +20,6 @@
#include <inttypes.h>
#include <string.h>
-#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -102,7 +101,7 @@ static void concurrent_test(void) {
grpc_core::Thread thds[CONCURRENT_TEST_THREADS];
for (int i = 0; i < CONCURRENT_TEST_THREADS; i++) {
- new (&thds[i])
+ thds[i] =
grpc_core::Thread("grpc_concurrent_test", concurrent_test_body, &args);
thds[i].Start();
}
diff --git a/test/core/gpr/cpu_test.cc b/test/core/gpr/cpu_test.cc
index 279e6e6f5a..1052d40b42 100644
--- a/test/core/gpr/cpu_test.cc
+++ b/test/core/gpr/cpu_test.cc
@@ -25,7 +25,6 @@
#include <stdio.h>
#include <string.h>
-#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -118,7 +117,7 @@ static void cpu_test(void) {
static_cast<grpc_core::Thread*>(gpr_malloc(sizeof(*thd) * nthreads));
for (i = 0; i < nthreads; i++) {
- new (&thd[i]) grpc_core::Thread("grpc_cpu_test", &worker_thread, &ct);
+ thd[i] = grpc_core::Thread("grpc_cpu_test", &worker_thread, &ct);
thd[i].Start();
}
gpr_mu_lock(&ct.mu);
@@ -128,7 +127,6 @@ static void cpu_test(void) {
gpr_mu_unlock(&ct.mu);
for (i = 0; i < nthreads; i++) {
thd[i].Join();
- thd[i].~Thread();
}
gpr_free(thd);
fprintf(stderr, "Saw cores [");
diff --git a/test/core/gpr/mpscq_test.cc b/test/core/gpr/mpscq_test.cc
index 1e929fcf33..8c0873941f 100644
--- a/test/core/gpr/mpscq_test.cc
+++ b/test/core/gpr/mpscq_test.cc
@@ -19,7 +19,6 @@
#include "src/core/lib/gpr/mpscq.h"
#include <stdlib.h>
-#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -85,7 +84,7 @@ static void test_mt(void) {
ta[i].ctr = 0;
ta[i].q = &q;
ta[i].start = &start;
- new (&thds[i]) grpc_core::Thread("grpc_mt_test", test_thread, &ta[i]);
+ thds[i] = grpc_core::Thread("grpc_mt_test", test_thread, &ta[i]);
thds[i].Start();
}
size_t num_done = 0;
@@ -155,7 +154,7 @@ static void test_mt_multipop(void) {
ta[i].ctr = 0;
ta[i].q = &q;
ta[i].start = &start;
- new (&thds[i]) grpc_core::Thread("grpc_multipop_test", test_thread, &ta[i]);
+ thds[i] = grpc_core::Thread("grpc_multipop_test", test_thread, &ta[i]);
thds[i].Start();
}
pull_args pa;
@@ -167,8 +166,7 @@ static void test_mt_multipop(void) {
pa.start = &start;
gpr_mu_init(&pa.mu);
for (size_t i = 0; i < GPR_ARRAY_SIZE(pull_thds); i++) {
- new (&pull_thds[i])
- grpc_core::Thread("grpc_multipop_pull", pull_thread, &pa);
+ pull_thds[i] = grpc_core::Thread("grpc_multipop_pull", pull_thread, &pa);
pull_thds[i].Start();
}
gpr_event_set(&start, (void*)1);
diff --git a/test/core/gpr/spinlock_test.cc b/test/core/gpr/spinlock_test.cc
index ac9f70f301..0ee72edb15 100644
--- a/test/core/gpr/spinlock_test.cc
+++ b/test/core/gpr/spinlock_test.cc
@@ -22,7 +22,6 @@
#include <stdio.h>
#include <stdlib.h>
-#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -69,7 +68,7 @@ static void test_destroy(struct test* m) {
static void test_create_threads(struct test* m, void (*body)(void* arg)) {
int i;
for (i = 0; i != m->thread_count; i++) {
- new (&m->threads[i]) grpc_core::Thread("grpc_create_threads", body, m);
+ m->threads[i] = grpc_core::Thread("grpc_create_threads", body, m);
m->threads[i].Start();
}
}
@@ -79,7 +78,6 @@ static void test_wait(struct test* m) {
int i;
for (i = 0; i != m->thread_count; i++) {
m->threads[i].Join();
- m->threads[i].~Thread();
}
}
diff --git a/test/core/gpr/sync_test.cc b/test/core/gpr/sync_test.cc
index 487f394b14..24b4562819 100644
--- a/test/core/gpr/sync_test.cc
+++ b/test/core/gpr/sync_test.cc
@@ -22,7 +22,6 @@
#include <stdio.h>
#include <stdlib.h>
-#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -196,7 +195,7 @@ static void test_destroy(struct test* m) {
static void test_create_threads(struct test* m, void (*body)(void* arg)) {
int i;
for (i = 0; i != m->nthreads; i++) {
- new (&m->threads[i]) grpc_core::Thread("grpc_create_threads", body, m);
+ m->threads[i] = grpc_core::Thread("grpc_create_threads", body, m);
m->threads[i].Start();
}
}
@@ -210,7 +209,6 @@ static void test_wait(struct test* m) {
gpr_mu_unlock(&m->mu);
for (int i = 0; i != m->nthreads; i++) {
m->threads[i].Join();
- m->threads[i].~Thread();
}
}
@@ -258,7 +256,7 @@ static void test(const char* name, void (*body)(void* m),
m = test_new(10, iterations, incr_step);
grpc_core::Thread extra_thd;
if (extra != nullptr) {
- new (&extra_thd) grpc_core::Thread(name, extra, m);
+ extra_thd = grpc_core::Thread(name, extra, m);
extra_thd.Start();
m->done++; /* one more thread to wait for */
}
diff --git a/test/core/gpr/tls_test.cc b/test/core/gpr/tls_test.cc
index a060cd47f1..0502fc7ef4 100644
--- a/test/core/gpr/tls_test.cc
+++ b/test/core/gpr/tls_test.cc
@@ -22,7 +22,6 @@
#include <stdio.h>
#include <stdlib.h>
-#include <new>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
@@ -56,7 +55,7 @@ int main(int argc, char* argv[]) {
gpr_tls_init(&test_var);
for (auto& th : threads) {
- new (&th) grpc_core::Thread("grpc_tls_test", thd_body, nullptr);
+ th = grpc_core::Thread("grpc_tls_test", thd_body, nullptr);
th.Start();
}
for (auto& th : threads) {
diff --git a/test/core/gprpp/thd_test.cc b/test/core/gprpp/thd_test.cc
index a126784e72..82dd681049 100644
--- a/test/core/gprpp/thd_test.cc
+++ b/test/core/gprpp/thd_test.cc
@@ -22,7 +22,6 @@
#include <stdio.h>
#include <stdlib.h>
-#include <new>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
@@ -60,7 +59,7 @@ static void test1(void) {
t.n = NUM_THREADS;
t.is_done = 0;
for (auto& th : thds) {
- new (&th) grpc_core::Thread("grpc_thread_body1_test", &thd_body1, &t);
+ th = grpc_core::Thread("grpc_thread_body1_test", &thd_body1, &t);
th.Start();
}
gpr_mu_lock(&t.mu);
@@ -81,8 +80,7 @@ static void test2(void) {
grpc_core::Thread thds[NUM_THREADS];
for (auto& th : thds) {
bool ok;
- new (&th)
- grpc_core::Thread("grpc_thread_body2_test", &thd_body2, nullptr, &ok);
+ th = grpc_core::Thread("grpc_thread_body2_test", &thd_body2, nullptr, &ok);
GPR_ASSERT(ok);
th.Start();
}
diff --git a/test/core/iomgr/combiner_test.cc b/test/core/iomgr/combiner_test.cc
index e4f4783e58..cf2c7db846 100644
--- a/test/core/iomgr/combiner_test.cc
+++ b/test/core/iomgr/combiner_test.cc
@@ -18,8 +18,6 @@
#include "src/core/lib/iomgr/combiner.h"
-#include <new>
-
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -105,8 +103,7 @@ static void test_execute_many(void) {
ta[i].ctr = 0;
ta[i].lock = lock;
gpr_event_init(&ta[i].done);
- new (&thds[i])
- grpc_core::Thread("grpc_execute_many", execute_many_loop, &ta[i]);
+ thds[i] = grpc_core::Thread("grpc_execute_many", execute_many_loop, &ta[i]);
thds[i].Start();
}
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
diff --git a/test/core/iomgr/ev_epollsig_linux_test.cc b/test/core/iomgr/ev_epollsig_linux_test.cc
index 4ec6fc8b0d..c3ba6d7c14 100644
--- a/test/core/iomgr/ev_epollsig_linux_test.cc
+++ b/test/core/iomgr/ev_epollsig_linux_test.cc
@@ -25,7 +25,6 @@
#include <errno.h>
#include <string.h>
#include <unistd.h>
-#include <new>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
@@ -262,7 +261,7 @@ static void test_threading(void) {
grpc_core::Thread thds[10];
for (auto& th : thds) {
- new (&th) grpc_core::Thread("test_thread", test_threading_loop, &shared);
+ th = grpc_core::Thread("test_thread", test_threading_loop, &shared);
th.Start();
}
grpc_wakeup_fd fd;
diff --git a/test/core/iomgr/resolve_address_posix_test.cc b/test/core/iomgr/resolve_address_posix_test.cc
index 895ede24c7..79b2b50e70 100644
--- a/test/core/iomgr/resolve_address_posix_test.cc
+++ b/test/core/iomgr/resolve_address_posix_test.cc
@@ -20,7 +20,6 @@
#include <string.h>
#include <sys/un.h>
-#include <new>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
@@ -106,7 +105,7 @@ static void actually_poll(void* argsp) {
static void poll_pollset_until_request_done(args_struct* args) {
gpr_atm_rel_store(&args->done_atm, 0);
- new (&args->thd) grpc_core::Thread("grpc_poll_pollset", actually_poll, args);
+ args->thd = grpc_core::Thread("grpc_poll_pollset", actually_poll, args);
args->thd.Start();
}
diff --git a/test/core/surface/completion_queue_threading_test.cc b/test/core/surface/completion_queue_threading_test.cc
index 1a76d7e6ae..9c8d8d8395 100644
--- a/test/core/surface/completion_queue_threading_test.cc
+++ b/test/core/surface/completion_queue_threading_test.cc
@@ -18,8 +18,6 @@
#include "src/core/lib/surface/completion_queue.h"
-#include <new>
-
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@@ -96,7 +94,7 @@ static void test_too_many_plucks(void) {
}
thread_states[i].cc = cc;
thread_states[i].tag = tags[i];
- new (&threads[i])
+ threads[i] =
grpc_core::Thread("grpc_pluck_test", pluck_one, thread_states + i);
threads[i].Start();
}
@@ -234,7 +232,7 @@ static void test_threading(size_t producers, size_t consumers) {
options[i].id = optid++;
bool ok;
- new (&threads[i]) grpc_core::Thread(
+ threads[i] = grpc_core::Thread(
i < producers ? "grpc_producer" : "grpc_consumer",
i < producers ? producer_thread : consumer_thread, options + i, &ok);
GPR_ASSERT(ok);
@@ -273,7 +271,6 @@ static void test_threading(size_t producers, size_t consumers) {
for (i = 0; i < producers + consumers; i++) {
threads[i].Join();
- threads[i].~Thread();
}
gpr_free(threads);
diff --git a/test/core/surface/concurrent_connectivity_test.cc b/test/core/surface/concurrent_connectivity_test.cc
index 32b4ae1da8..c1298b6636 100644
--- a/test/core/surface/concurrent_connectivity_test.cc
+++ b/test/core/surface/concurrent_connectivity_test.cc
@@ -24,7 +24,6 @@
#include <memory.h>
#include <stdio.h>
-#include <new>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
@@ -179,8 +178,7 @@ int run_concurrent_connectivity_test() {
char* localhost = gpr_strdup("localhost:54321");
grpc_core::Thread threads[NUM_THREADS];
for (auto& th : threads) {
- new (&th)
- grpc_core::Thread("grpc_wave_1", create_loop_destroy, localhost);
+ th = grpc_core::Thread("grpc_wave_1", create_loop_destroy, localhost);
th.Start();
}
for (auto& th : threads) {
@@ -204,8 +202,7 @@ int run_concurrent_connectivity_test() {
grpc_core::Thread threads[NUM_THREADS];
for (auto& th : threads) {
- new (&th)
- grpc_core::Thread("grpc_wave_2", create_loop_destroy, args.addr);
+ th = grpc_core::Thread("grpc_wave_2", create_loop_destroy, args.addr);
th.Start();
}
for (auto& th : threads) {
@@ -231,8 +228,7 @@ int run_concurrent_connectivity_test() {
grpc_core::Thread threads[NUM_THREADS];
for (auto& th : threads) {
- new (&th)
- grpc_core::Thread("grpc_wave_3", create_loop_destroy, args.addr);
+ th = grpc_core::Thread("grpc_wave_3", create_loop_destroy, args.addr);
th.Start();
}
for (auto& th : threads) {
@@ -291,8 +287,8 @@ int run_concurrent_watches_with_short_timeouts_test() {
char* localhost = gpr_strdup("localhost:54321");
for (auto& th : threads) {
- new (&th) grpc_core::Thread("grpc_short_watches",
- watches_with_short_timeouts, localhost);
+ th = grpc_core::Thread("grpc_short_watches", watches_with_short_timeouts,
+ localhost);
th.Start();
}
for (auto& th : threads) {