diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/iomgr/pollset.h | 7 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_poll_posix.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 31 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.h | 9 | ||||
-rw-r--r-- | src/core/iomgr/pollset_windows.c | 9 | ||||
-rw-r--r-- | src/core/security/google_default_credentials.c | 2 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 24 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 104 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 29 |
10 files changed, 142 insertions, 77 deletions
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index c474e4dbf1..337596cb74 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -74,10 +74,9 @@ void grpc_pollset_destroy(grpc_pollset *pollset); grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will not be released by grpc_pollset_work AFTER worker has been destroyed. - Returns true if some work has been done, and false if the deadline - expired. */ -int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, - gpr_timespec deadline); + Tries not to block past deadline. */ +void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec now, gpr_timespec deadline); /* Break one polling thread out of polling work for this pollset. If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers. diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 1320c64579..4d41db074d 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -181,7 +181,7 @@ static void multipoll_with_epoll_pollset_maybe_work( pfds[1].events = POLLIN; pfds[1].revents = 0; - poll_rv = poll(pfds, 2, timeout_ms); + poll_rv = grpc_poll_function(pfds, 2, timeout_ms); if (poll_rv < 0) { if (errno != EINTR) { diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index b5b2d7534d..388b2d2a8a 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -144,7 +144,7 @@ static void multipoll_with_poll_pollset_maybe_work( POLLOUT, &watchers[i]); } - r = poll(pfds, pfd_count, timeout); + r = grpc_poll_function(pfds, pfd_count, timeout); for (i = 1; i < pfd_count; i++) { grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN, diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index d3a9193af1..6bd1b61f24 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -38,7 +38,6 @@ #include "src/core/iomgr/pollset_posix.h" #include <errno.h> -#include <poll.h> #include <stdlib.h> #include <string.h> #include <unistd.h> @@ -57,6 +56,8 @@ GPR_TLS_DECL(g_current_thread_poller); GPR_TLS_DECL(g_current_thread_worker); +grpc_poll_function_type grpc_poll_function = poll; + static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next; worker->next->prev = worker->prev; @@ -89,6 +90,7 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { } void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { + /* pollset->mu already held */ if (specific_worker != NULL) { if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { for (specific_worker = p->root_worker.next; @@ -140,10 +142,10 @@ void grpc_pollset_init(grpc_pollset *pollset) { void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); pollset->vtable->add_fd(pollset, fd, 1); - /* the following (enabled only in debug) will reacquire and then release - our lock - meaning that if the unlocking flag passed to del_fd above is - not respected, the code will deadlock (in a way that we have a chance of - debugging) */ +/* the following (enabled only in debug) will reacquire and then release + our lock - meaning that if the unlocking flag passed to del_fd above is + not respected, the code will deadlock (in a way that we have a chance of + debugging) */ #ifndef NDEBUG gpr_mu_lock(&pollset->mu); gpr_mu_unlock(&pollset->mu); @@ -153,10 +155,10 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); pollset->vtable->del_fd(pollset, fd, 1); - /* the following (enabled only in debug) will reacquire and then release - our lock - meaning that if the unlocking flag passed to del_fd above is - not respected, the code will deadlock (in a way that we have a chance of - debugging) */ +/* the following (enabled only in debug) will reacquire and then release + our lock - meaning that if the unlocking flag passed to del_fd above is + not respected, the code will deadlock (in a way that we have a chance of + debugging) */ #ifndef NDEBUG gpr_mu_lock(&pollset->mu); gpr_mu_unlock(&pollset->mu); @@ -168,14 +170,10 @@ static void finish_shutdown(grpc_pollset *pollset) { pollset->shutdown_done_cb(pollset->shutdown_done_arg); } -int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, - gpr_timespec deadline) { +void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec now, gpr_timespec deadline) { /* pollset->mu already held */ - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); int added_worker = 0; - if (gpr_time_cmp(now, deadline) > 0) { - return 0; - } /* this must happen before we (potentially) drop pollset->mu */ worker->next = worker->prev = NULL; /* TODO(ctiller): pool these */ @@ -217,7 +215,6 @@ done: gpr_mu_lock(&pollset->mu); } } - return 1; } void grpc_pollset_shutdown(grpc_pollset *pollset, @@ -456,7 +453,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, /* poll fd count (argument 2) is shortened by one if we have no events to poll on - such that it only includes the kicker */ - r = poll(pfd, nfds, timeout); + r = grpc_poll_function(pfd, nfds, timeout); GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); if (fd) { diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 1c1b736193..69bd9cca8c 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -34,6 +34,8 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H +#include <poll.h> + #include <grpc/support/sync.h> #include "src/core/iomgr/wakeup_fd_posix.h" @@ -102,7 +104,8 @@ void grpc_kick_drain(grpc_pollset *p); - longer than a millisecond polls are rounded up to the next nearest millisecond to avoid spinning - infinite timeouts are converted to -1 */ -int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now); +int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, + gpr_timespec now); /* turn a pollset into a multipoller: platform specific */ typedef void (*grpc_platform_become_multipoller_type)(grpc_pollset *pollset, @@ -117,4 +120,8 @@ void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds, * be locked) */ int grpc_pollset_has_workers(grpc_pollset *pollset); +/* override to allow tests to hook poll() usage */ +typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); +extern grpc_poll_function_type grpc_poll_function; + #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */ diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 22dc5891c3..1078fa5384 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -100,13 +100,9 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { gpr_mu_destroy(&pollset->mu); } -int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline) { - gpr_timespec now; +void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec now, gpr_timespec deadline) { int added_worker = 0; - now = gpr_now(GPR_CLOCK_MONOTONIC); - if (gpr_time_cmp(now, deadline) > 0) { - return 0 /* GPR_FALSE */; - } worker->next = worker->prev = NULL; gpr_cv_init(&worker->cv); if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) { @@ -127,7 +123,6 @@ done: if (added_worker) { remove_worker(pollset, worker); } - return 1 /* GPR_TRUE */; } void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index d1f228665f..347b0da643 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -115,7 +115,7 @@ static int is_stack_running_on_compute_engine(void) { gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset)); while (!detector.is_done) { grpc_pollset_worker worker; - grpc_pollset_work(&detector.pollset, &worker, + grpc_pollset_work(&detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset)); diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 378b3f71a1..b58115a93f 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -167,10 +167,12 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, } grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, - gpr_timespec deadline, - void *reserved) { + gpr_timespec deadline, void *reserved) { grpc_event ret; grpc_pollset_worker worker; + int first_loop = 1; + gpr_timespec now; + GPR_ASSERT(!reserved); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); @@ -197,12 +199,15 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, ret.type = GRPC_QUEUE_SHUTDOWN; break; } - if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) { + now = gpr_now(GPR_CLOCK_MONOTONIC); + if (!first_loop && gpr_time_cmp(now, deadline) >= 0) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; break; } + first_loop = 0; + grpc_pollset_work(&cc->pollset, &worker, now, deadline); } GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); @@ -240,6 +245,9 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_cq_completion *c; grpc_cq_completion *prev; grpc_pollset_worker worker; + gpr_timespec now; + int first_loop = 1; + GPR_ASSERT(!reserved); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); @@ -272,8 +280,9 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, break; } if (!add_plucker(cc, tag, &worker)) { - gpr_log(GPR_DEBUG, - "Too many outstanding grpc_completion_queue_pluck calls: maximum is %d", + gpr_log(GPR_DEBUG, + "Too many outstanding grpc_completion_queue_pluck calls: maximum " + "is %d", GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); @@ -281,13 +290,16 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ret.type = GRPC_QUEUE_TIMEOUT; break; } - if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) { + now = gpr_now(GPR_CLOCK_MONOTONIC); + if (!first_loop && gpr_time_cmp(now, deadline) >= 0) { del_plucker(cc, tag, &worker); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; break; } + first_loop = 0; + grpc_pollset_work(&cc->pollset, &worker, now, deadline); del_plucker(cc, tag, &worker); } done: diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index a70b555855..dbe60e50de 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -50,6 +50,52 @@ namespace grpc { +class Server::UnimplementedAsyncRequestContext { + protected: + UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {} + + GenericServerContext server_context_; + GenericServerAsyncReaderWriter generic_stream_; +}; + +class Server::UnimplementedAsyncRequest GRPC_FINAL + : public UnimplementedAsyncRequestContext, + public GenericAsyncRequest { + public: + UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq) + : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq, + NULL, false), + server_(server), + cq_(cq) {} + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; + + ServerContext* context() { return &server_context_; } + GenericServerAsyncReaderWriter* stream() { return &generic_stream_; } + + private: + Server* const server_; + ServerCompletionQueue* const cq_; +}; + +typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> + UnimplementedAsyncResponseOp; +class Server::UnimplementedAsyncResponse GRPC_FINAL + : public UnimplementedAsyncResponseOp { + public: + UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); + ~UnimplementedAsyncResponse() { delete request_; } + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { + bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status); + delete this; + return r; + } + + private: + UnimplementedAsyncRequest* const request_; +}; + class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { @@ -230,11 +276,11 @@ Server::~Server() { delete sync_methods_; } -bool Server::RegisterService(const grpc::string *host, RpcService* service) { +bool Server::RegisterService(const grpc::string* host, RpcService* service) { for (int i = 0; i < service->GetMethodCount(); ++i) { RpcServiceMethod* method = service->GetMethod(i); - void* tag = grpc_server_register_method( - server_, method->name(), host ? host->c_str() : nullptr); + void* tag = grpc_server_register_method(server_, method->name(), + host ? host->c_str() : nullptr); if (!tag) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); @@ -277,27 +323,15 @@ int Server::AddListeningPort(const grpc::string& addr, return creds->AddPortToServer(addr, server_); } -bool Server::Start() { +bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { GPR_ASSERT(!started_); started_ = true; grpc_server_start(server_); if (!has_generic_service_) { - unknown_method_.reset(new RpcServiceMethod( - "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); - // Use of emplace_back with just constructor arguments is not accepted here - // by gcc-4.4 because it can't match the anonymous nullptr with a proper - // constructor implicitly. Construct the object and use push_back. - sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr)); - } - // Start processing rpcs. - if (!sync_methods_->empty()) { - for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { - m->SetupRequest(); - m->Request(server_, cq_.cq()); + for (size_t i = 0; i < num_cqs; i++) { + new UnimplementedAsyncRequest(this, cqs[i]); } - - ScheduleCallback(); } return true; @@ -335,12 +369,14 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { Server::BaseAsyncRequest::BaseAsyncRequest( Server* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, + bool delete_on_finalize) : server_(server), context_(context), stream_(stream), call_cq_(call_cq), tag_(tag), + delete_on_finalize_(delete_on_finalize), call_(nullptr) { memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_)); } @@ -367,14 +403,16 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) { // just the pointers inside call are copied here stream_->BindCall(&call); *tag = tag_; - delete this; + if (delete_on_finalize_) { + delete this; + } return true; } Server::RegisteredAsyncRequest::RegisteredAsyncRequest( Server* server, ServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) - : BaseAsyncRequest(server, context, stream, call_cq, tag) {} + : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} void Server::RegisteredAsyncRequest::IssueRequest( void* registered_method, grpc_byte_buffer** payload, @@ -388,8 +426,9 @@ void Server::RegisteredAsyncRequest::IssueRequest( Server::GenericAsyncRequest::GenericAsyncRequest( Server* server, GenericServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag) - : BaseAsyncRequest(server, context, stream, call_cq, tag) { + ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) + : BaseAsyncRequest(server, context, stream, call_cq, tag, + delete_on_finalize) { grpc_call_details_init(&call_details_); GPR_ASSERT(notification_cq); GPR_ASSERT(call_cq); @@ -410,6 +449,25 @@ bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) { return BaseAsyncRequest::FinalizeResult(tag, status); } +bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, + bool* status) { + if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) { + new UnimplementedAsyncRequest(server_, cq_); + new UnimplementedAsyncResponse(this); + } else { + delete this; + } + return false; +} + +Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( + UnimplementedAsyncRequest* request) + : request_(request) { + Status status(StatusCode::UNIMPLEMENTED, ""); + UnknownMethodHandler::FillOps(request_->context(), this); + request_->stream()->call_.PerformOps(this); +} + void Server::ScheduleCallback() { { grpc::unique_lock<grpc::mutex> lock(mu_); diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 09118879f4..a13269abbf 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -59,14 +59,16 @@ void ServerBuilder::RegisterAsyncService(AsynchronousService* service) { async_services_.emplace_back(new NamedService<AsynchronousService>(service)); } -void ServerBuilder::RegisterService( - const grpc::string& addr, SynchronousService* service) { - services_.emplace_back(new NamedService<RpcService>(addr, service->service())); +void ServerBuilder::RegisterService(const grpc::string& addr, + SynchronousService* service) { + services_.emplace_back( + new NamedService<RpcService>(addr, service->service())); } -void ServerBuilder::RegisterAsyncService( - const grpc::string& addr, AsynchronousService* service) { - async_services_.emplace_back(new NamedService<AsynchronousService>(addr, service)); +void ServerBuilder::RegisterAsyncService(const grpc::string& addr, + AsynchronousService* service) { + async_services_.emplace_back( + new NamedService<AsynchronousService>(addr, service)); } void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) { @@ -101,12 +103,6 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { thread_pool_ = CreateDefaultThreadPool(); thread_pool_owned = true; } - // Async services only, create a thread pool to handle requests to unknown - // services. - if (!thread_pool_ && !generic_service_ && !async_services_.empty()) { - thread_pool_ = new FixedSizeThreadPool(1); - thread_pool_owned = true; - } std::unique_ptr<Server> server( new Server(thread_pool_, thread_pool_owned, max_message_size_)); for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { @@ -119,9 +115,10 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { return nullptr; } } - for (auto service = async_services_.begin(); - service != async_services_.end(); service++) { - if (!server->RegisterAsyncService((*service)->host.get(), (*service)->service)) { + for (auto service = async_services_.begin(); service != async_services_.end(); + service++) { + if (!server->RegisterAsyncService((*service)->host.get(), + (*service)->service)) { return nullptr; } } @@ -135,7 +132,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { *port->selected_port = r; } } - if (!server->Start()) { + if (!server->Start(&cqs_[0], cqs_.size())) { return nullptr; } return server; |