diff options
Diffstat (limited to 'src/cpp/common/completion_queue.cc')
-rw-r--r-- | src/cpp/common/completion_queue.cc | 89 |
1 files changed, 29 insertions, 60 deletions
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc index f06da9b04f..cbeda81a0b 100644 --- a/src/cpp/common/completion_queue.cc +++ b/src/cpp/common/completion_queue.cc @@ -1,5 +1,4 @@ /* - * * Copyright 2014, Google Inc. * All rights reserved. * @@ -33,11 +32,12 @@ #include <grpc++/completion_queue.h> +#include <memory> + #include <grpc/grpc.h> #include <grpc/support/log.h> #include <grpc/support/time.h> #include "src/cpp/util/time.h" -#include <grpc++/async_server_context.h> namespace grpc { @@ -47,66 +47,35 @@ CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); } void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); } -CompletionQueue::CompletionType CompletionQueue::Next(void **tag) { - grpc_event *ev; - CompletionType return_type; - bool success; +// Helper class so we can declare a unique_ptr with grpc_event +class EventDeleter { + public: + void operator()(grpc_event *ev) { if (ev) grpc_event_finish(ev); } +}; - ev = grpc_completion_queue_next(cq_, gpr_inf_future); - if (!ev) { - gpr_log(GPR_ERROR, "no next event in queue"); - abort(); - } - switch (ev->type) { - case GRPC_QUEUE_SHUTDOWN: - return_type = QUEUE_CLOSED; - break; - case GRPC_READ: - *tag = ev->tag; - if (ev->data.read) { - success = static_cast<AsyncServerContext *>(ev->tag) - ->ParseRead(ev->data.read); - return_type = success ? SERVER_READ_OK : SERVER_READ_ERROR; - } else { - return_type = SERVER_READ_ERROR; - } - break; - case GRPC_WRITE_ACCEPTED: - *tag = ev->tag; - if (ev->data.write_accepted != GRPC_OP_ERROR) { - return_type = SERVER_WRITE_OK; - } else { - return_type = SERVER_WRITE_ERROR; - } - break; - case GRPC_SERVER_RPC_NEW: - GPR_ASSERT(!ev->tag); - // Finishing the pending new rpcs after the server has been shutdown. - if (!ev->call) { - *tag = nullptr; - } else { - *tag = new AsyncServerContext( - ev->call, ev->data.server_rpc_new.method, - ev->data.server_rpc_new.host, - Timespec2Timepoint(ev->data.server_rpc_new.deadline)); - } - return_type = SERVER_RPC_NEW; - break; - case GRPC_FINISHED: - *tag = ev->tag; - return_type = RPC_END; - break; - case GRPC_FINISH_ACCEPTED: - *tag = ev->tag; - return_type = HALFCLOSE_OK; - break; - default: - // We do not handle client side messages now - gpr_log(GPR_ERROR, "client-side messages aren't supported yet"); - abort(); +bool CompletionQueue::Next(void **tag, bool *ok) { + std::unique_ptr<grpc_event, EventDeleter> ev; + + ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future)); + if (ev->type == GRPC_QUEUE_SHUTDOWN) { + return false; } - grpc_event_finish(ev); - return return_type; + auto cq_tag = static_cast<CompletionQueueTag *>(ev->tag); + *ok = ev->data.op_complete == GRPC_OP_OK; + *tag = cq_tag; + cq_tag->FinalizeResult(tag, ok); + return true; +} + +bool CompletionQueue::Pluck(CompletionQueueTag *tag) { + std::unique_ptr<grpc_event, EventDeleter> ev; + + ev.reset(grpc_completion_queue_pluck(cq_, tag, gpr_inf_future)); + bool ok = ev->data.op_complete == GRPC_OP_OK; + void *ignored = tag; + tag->FinalizeResult(&ignored, &ok); + GPR_ASSERT(ignored == tag); + return ok; } } // namespace grpc |