diff options
Diffstat (limited to 'include/grpc++/impl/codegen/completion_queue.h')
-rw-r--r-- | include/grpc++/impl/codegen/completion_queue.h | 43 |
1 files changed, 36 insertions, 7 deletions
diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 928ab2db31..42ec287e65 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -36,9 +36,12 @@ #ifndef GRPCXX_IMPL_CODEGEN_COMPLETION_QUEUE_H #define GRPCXX_IMPL_CODEGEN_COMPLETION_QUEUE_H +#include <grpc++/impl/codegen/completion_queue_tag.h> +#include <grpc++/impl/codegen/core_codegen_interface.h> #include <grpc++/impl/codegen/grpc_library.h> #include <grpc++/impl/codegen/status.h> #include <grpc++/impl/codegen/time.h> +#include <grpc/impl/codegen/time.h> struct grpc_completion_queue; @@ -76,13 +79,17 @@ class Server; class ServerBuilder; class ServerContext; +extern CoreCodegenInterface* g_core_codegen_interface; + /// A thin wrapper around \a grpc_completion_queue (see / \a /// src/core/surface/completion_queue.h). -class CompletionQueue : private GrpcLibrary { +class CompletionQueue : private GrpcLibraryCodegen { public: /// Default constructor. Implicitly creates a \a grpc_completion_queue /// instance. - CompletionQueue(); + CompletionQueue() { + cq_ = g_core_codegen_interface->grpc_completion_queue_create(nullptr); + } /// Wrap \a take, taking ownership of the instance. /// @@ -90,7 +97,9 @@ class CompletionQueue : private GrpcLibrary { explicit CompletionQueue(grpc_completion_queue* take); /// Destructor. Destroys the owned wrapped completion queue / instance. - ~CompletionQueue(); + ~CompletionQueue() { + g_core_codegen_interface->grpc_completion_queue_destroy(cq_); + } /// Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT. enum NextStatus { @@ -124,8 +133,8 @@ class CompletionQueue : private GrpcLibrary { /// /// \return true if read a regular event, false if the queue is shutting down. bool Next(void** tag, bool* ok) { - return (AsyncNextInternal(tag, ok, gpr_inf_future(GPR_CLOCK_REALTIME)) != - SHUTDOWN); + return (AsyncNextInternal(tag, ok, g_core_codegen_interface->gpr_inf_future( + GPR_CLOCK_REALTIME)) != SHUTDOWN); } /// Request the shutdown of the queue. @@ -181,11 +190,31 @@ class CompletionQueue : private GrpcLibrary { /// Wraps \a grpc_completion_queue_pluck. /// \warning Must not be mixed with calls to \a Next. - bool Pluck(CompletionQueueTag* tag); + bool Pluck(CompletionQueueTag* tag) { + auto deadline = + g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME); + auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( + cq_, tag, deadline, nullptr); + bool ok = ev.success != 0; + void* ignored = tag; + GPR_CODEGEN_ASSERT(tag->FinalizeResult(&ignored, &ok)); + GPR_CODEGEN_ASSERT(ignored == tag); + // Ignore mutations by FinalizeResult: Pluck returns the C API status + return ev.success != 0; + } /// Performs a single polling pluck on \a tag. /// \warning Must not be mixed with calls to \a Next. - void TryPluck(CompletionQueueTag* tag); + void TryPluck(CompletionQueueTag* tag) { + auto deadline = gpr_time_0(GPR_CLOCK_REALTIME); + auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( + cq_, tag, deadline, nullptr); + if (ev.type == GRPC_QUEUE_TIMEOUT) return; + bool ok = ev.success != 0; + void* ignored = tag; + // the tag must be swallowed if using TryPluck + GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok)); + } grpc_completion_queue* cq_; // owned }; |