aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2016-03-07 18:19:12 -0800
committerGravatar David Garcia Quintas <dgq@google.com>2016-03-07 18:19:12 -0800
commite1ce31eda3321bb0052416ba47145809a8199f1e (patch)
treea47309ad10b947c9672485ce7514071ae355a339
parent6848c4e14584e55859018b30390589c418b93358 (diff)
wip. cq refactored
-rw-r--r--include/grpc++/impl/codegen/client_context.h3
-rw-r--r--include/grpc++/impl/codegen/completion_queue.h36
-rw-r--r--include/grpc++/impl/codegen/core_codegen_interface.h21
-rw-r--r--include/grpc++/impl/codegen/grpc_library.h5
-rw-r--r--include/grpc++/impl/codegen/impl/async_stream.h25
-rw-r--r--include/grpc++/impl/codegen/server_interface.h5
-rw-r--r--include/grpc++/impl/codegen/service_type.h10
-rw-r--r--include/grpc/impl/codegen/time.h31
-rw-r--r--src/core/support/time.c24
-rw-r--r--src/cpp/codegen/core_codegen.cc34
-rw-r--r--src/cpp/common/completion_queue.cc33
11 files changed, 118 insertions, 109 deletions
diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h
index db2afe930c..271d464583 100644
--- a/include/grpc++/impl/codegen/client_context.h
+++ b/include/grpc++/impl/codegen/client_context.h
@@ -54,6 +54,7 @@
#include <string>
#include <grpc++/impl/codegen/config.h>
+#include <grpc++/impl/codegen/core_codegen_interface.h>
#include <grpc++/impl/codegen/security/auth_context.h>
#include <grpc++/impl/codegen/status.h>
#include <grpc++/impl/codegen/string_ref.h>
@@ -192,7 +193,7 @@ class ClientContext {
/// \return A multimap of initial metadata key-value pairs from the server.
const std::multimap<grpc::string_ref, grpc::string_ref>&
GetServerInitialMetadata() {
- GPR_ASSERT(initial_metadata_received_);
+ GPR_CODEGEN_ASSERT(initial_metadata_received_);
return recv_initial_metadata_;
}
diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h
index 102831e1c9..6473494d86 100644
--- a/include/grpc++/impl/codegen/completion_queue.h
+++ b/include/grpc++/impl/codegen/completion_queue.h
@@ -36,6 +36,9 @@
#ifndef GRPCXX_IMPL_CODEGEN_COMPLETION_QUEUE_H
#define GRPCXX_IMPL_CODEGEN_COMPLETION_QUEUE_H
+#include <grpc/impl/codegen/time.h>
+#include <grpc++/impl/codegen/completion_queue_tag.h>
+#include <grpc++/impl/codegen/core_codegen_interface.h>
#include <grpc++/impl/codegen/grpc_library.h>
#include <grpc++/impl/codegen/status.h>
#include <grpc++/impl/codegen/time.h>
@@ -76,13 +79,17 @@ class Server;
class ServerBuilder;
class ServerContext;
+extern CoreCodegenInterface* g_core_codegen_interface;
+
/// A thin wrapper around \a grpc_completion_queue (see / \a
/// src/core/surface/completion_queue.h).
class CompletionQueue : private GrpcLibrary {
public:
/// Default constructor. Implicitly creates a \a grpc_completion_queue
/// instance.
- CompletionQueue();
+ CompletionQueue() {
+ cq_ = g_core_codegen_interface->grpc_completion_queue_create(nullptr);
+ }
/// Wrap \a take, taking ownership of the instance.
///
@@ -90,7 +97,9 @@ class CompletionQueue : private GrpcLibrary {
explicit CompletionQueue(grpc_completion_queue* take);
/// Destructor. Destroys the owned wrapped completion queue / instance.
- ~CompletionQueue();
+ ~CompletionQueue() {
+ g_core_codegen_interface->grpc_completion_queue_destroy(cq_);
+ }
/// Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT.
enum NextStatus {
@@ -181,10 +190,29 @@ class CompletionQueue : private GrpcLibrary {
/// Wraps \a grpc_completion_queue_pluck.
/// \warning Must not be mixed with calls to \a Next.
- bool Pluck(CompletionQueueTag* tag);
+ bool Pluck(CompletionQueueTag* tag) {
+ auto deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ auto ev = g_core_codegen_interface->grpc_completion_queue_pluck(
+ cq_, tag, deadline, nullptr);
+ bool ok = ev.success != 0;
+ void* ignored = tag;
+ GPR_CODEGEN_ASSERT(tag->FinalizeResult(&ignored, &ok));
+ GPR_CODEGEN_ASSERT(ignored == tag);
+ // Ignore mutations by FinalizeResult: Pluck returns the C API status
+ return ev.success != 0;
+ }
/// Performs a single polling pluck on \a tag.
- void TryPluck(CompletionQueueTag* tag);
+ void TryPluck(CompletionQueueTag* tag) {
+ auto deadline = gpr_time_0(GPR_CLOCK_REALTIME);
+ auto ev = g_core_codegen_interface->grpc_completion_queue_pluck(
+ cq_, tag, deadline, nullptr);
+ if (ev.type == GRPC_QUEUE_TIMEOUT) return;
+ bool ok = ev.success != 0;
+ void* ignored = tag;
+ // the tag must be swallowed if using TryPluck
+ GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok));
+ }
grpc_completion_queue* cq_; // owned
};
diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h
index b1b128cc4a..f043f96072 100644
--- a/include/grpc++/impl/codegen/core_codegen_interface.h
+++ b/include/grpc++/impl/codegen/core_codegen_interface.h
@@ -43,9 +43,13 @@ namespace grpc {
class CoreCodegenInterface {
public:
- virtual grpc_completion_queue* CompletionQueueCreate() = 0;
- virtual grpc_event CompletionQueuePluck(grpc_completion_queue* cq, void* tag,
- gpr_timespec deadline) = 0;
+ virtual grpc_completion_queue* grpc_completion_queue_create(
+ void* reserved) = 0;
+ virtual void grpc_completion_queue_destroy(grpc_completion_queue* cq) = 0;
+ virtual grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq,
+ void* tag,
+ gpr_timespec deadline,
+ void* reserved) = 0;
// Serialize the msg into a buffer created inside the function. The caller
// should destroy the returned buffer when done with it. If serialization
@@ -70,11 +74,12 @@ class CoreCodegenInterface {
};
/* XXX */
-#define GPR_CODEGEN_ASSERT(x) \
- do { \
- if (!(x)) { \
- g_core_codegen_interface->assert_fail(#x); \
- } \
+#define GPR_CODEGEN_ASSERT(x) \
+ do { \
+ if (!(x)) { \
+ extern CoreCodegenInterface* g_core_codegen_interface; \
+ g_core_codegen_interface->assert_fail(#x); \
+ } \
} while (0)
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/grpc_library.h b/include/grpc++/impl/codegen/grpc_library.h
index eb7152a2c6..ef076315f5 100644
--- a/include/grpc++/impl/codegen/grpc_library.h
+++ b/include/grpc++/impl/codegen/grpc_library.h
@@ -35,6 +35,7 @@
#define GRPCXX_IMPL_CODEGEN_GRPC_LIBRARY_H
#include <grpc/impl/codegen/log.h>
+#include <grpc++/impl/codegen/core_codegen_interface.h>
namespace grpc {
@@ -49,13 +50,13 @@ extern GrpcLibraryInterface* g_glip;
class GrpcLibrary {
public:
GrpcLibrary() {
- GPR_ASSERT(g_glip &&
+ GPR_CODEGEN_ASSERT(g_glip &&
"gRPC library not initialized. See "
"grpc::internal::GrpcLibraryInitializer.");
g_glip->init();
}
virtual ~GrpcLibrary() {
- GPR_ASSERT(g_glip &&
+ GPR_CODEGEN_ASSERT(g_glip &&
"gRPC library not initialized. See "
"grpc::internal::GrpcLibraryInitializer.");
g_glip->shutdown();
diff --git a/include/grpc++/impl/codegen/impl/async_stream.h b/include/grpc++/impl/codegen/impl/async_stream.h
index b0410485f8..fea935a362 100644
--- a/include/grpc++/impl/codegen/impl/async_stream.h
+++ b/include/grpc++/impl/codegen/impl/async_stream.h
@@ -36,6 +36,7 @@
#include <grpc++/impl/codegen/channel_interface.h>
#include <grpc++/impl/codegen/call.h>
+#include <grpc++/impl/codegen/core_codegen_interface.h>
#include <grpc++/impl/codegen/service_type.h>
#include <grpc++/impl/codegen/server_context.h>
#include <grpc++/impl/codegen/status.h>
@@ -109,13 +110,13 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
// TODO(ctiller): don't assert
- GPR_ASSERT(init_ops_.SendMessage(request).ok());
+ GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok());
init_ops_.ClientSendClose();
call_.PerformOps(&init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
- GPR_ASSERT(!context_->initial_metadata_received_);
+ GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
meta_ops_.set_output_tag(tag);
meta_ops_.RecvInitialMetadata(context_);
@@ -177,7 +178,7 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
- GPR_ASSERT(!context_->initial_metadata_received_);
+ GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
meta_ops_.set_output_tag(tag);
meta_ops_.RecvInitialMetadata(context_);
@@ -187,7 +188,7 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_ops_.set_output_tag(tag);
// TODO(ctiller): don't assert
- GPR_ASSERT(write_ops_.SendMessage(msg).ok());
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
@@ -243,7 +244,7 @@ class ClientAsyncReaderWriter GRPC_FINAL
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
- GPR_ASSERT(!context_->initial_metadata_received_);
+ GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
meta_ops_.set_output_tag(tag);
meta_ops_.RecvInitialMetadata(context_);
@@ -262,7 +263,7 @@ class ClientAsyncReaderWriter GRPC_FINAL
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_ops_.set_output_tag(tag);
// TODO(ctiller): don't assert
- GPR_ASSERT(write_ops_.SendMessage(msg).ok());
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
@@ -300,7 +301,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
- GPR_ASSERT(!ctx_->sent_initial_metadata_);
+ GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
@@ -331,7 +332,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
}
void FinishWithError(const Status& status, void* tag) {
- GPR_ASSERT(!status.ok());
+ GPR_CODEGEN_ASSERT(!status.ok());
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
@@ -360,7 +361,7 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
- GPR_ASSERT(!ctx_->sent_initial_metadata_);
+ GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
@@ -375,7 +376,7 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
- GPR_ASSERT(write_ops_.SendMessage(msg).ok());
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
@@ -409,7 +410,7 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
- GPR_ASSERT(!ctx_->sent_initial_metadata_);
+ GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
@@ -430,7 +431,7 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
- GPR_ASSERT(write_ops_.SendMessage(msg).ok());
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h
index f934619c20..1dcc01285a 100644
--- a/include/grpc++/impl/codegen/server_interface.h
+++ b/include/grpc++/impl/codegen/server_interface.h
@@ -37,6 +37,7 @@
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc++/impl/codegen/call_hook.h>
#include <grpc++/impl/codegen/completion_queue_tag.h>
+#include <grpc++/impl/codegen/core_codegen_interface.h>
#include <grpc++/impl/codegen/rpc_service_method.h>
namespace grpc {
@@ -223,7 +224,7 @@ class ServerInterface : public CallHook {
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* message) {
- GPR_ASSERT(method);
+ GPR_CODEGEN_ASSERT(method);
new PayloadAsyncRequest<Message>(method->server_tag(), this, context,
stream, call_cq, notification_cq, tag,
message);
@@ -233,7 +234,7 @@ class ServerInterface : public CallHook {
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
- GPR_ASSERT(method);
+ GPR_CODEGEN_ASSERT(method);
new NoPayloadAsyncRequest(method->server_tag(), this, context, stream,
call_cq, notification_cq, tag);
}
diff --git a/include/grpc++/impl/codegen/service_type.h b/include/grpc++/impl/codegen/service_type.h
index deb91a41d9..901468e3ee 100644
--- a/include/grpc++/impl/codegen/service_type.h
+++ b/include/grpc++/impl/codegen/service_type.h
@@ -35,6 +35,7 @@
#define GRPCXX_IMPL_CODEGEN_SERVICE_TYPE_H
#include <grpc++/impl/codegen/config.h>
+#include <grpc++/impl/codegen/core_codegen_interface.h>
#include <grpc++/impl/codegen/rpc_service_method.h>
#include <grpc++/impl/codegen/serialization_traits.h>
#include <grpc++/impl/codegen/server_interface.h>
@@ -131,21 +132,16 @@ class Service {
void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); }
void MarkMethodAsync(int index) {
- if (methods_[index].get() == nullptr) {
- gpr_log(GPR_ERROR,
+ GPR_CODEGEN_ASSERT(methods_[index].get() != nullptr &&
"Cannot mark the method as 'async' because it has already been "
"marked as 'generic'.");
- return;
- }
methods_[index]->ResetHandler();
}
void MarkMethodGeneric(int index) {
- if (methods_[index]->handler() == nullptr) {
- gpr_log(GPR_ERROR,
+ GPR_CODEGEN_ASSERT(methods_[index]->handler() != nullptr &&
"Cannot mark the method as 'generic' because it has already been "
"marked as 'async'.");
- }
methods_[index].reset();
}
diff --git a/include/grpc/impl/codegen/time.h b/include/grpc/impl/codegen/time.h
index c22bedfe77..9776dabc11 100644
--- a/include/grpc/impl/codegen/time.h
+++ b/include/grpc/impl/codegen/time.h
@@ -69,10 +69,33 @@ typedef struct gpr_timespec {
} gpr_timespec;
/* Time constants. */
-GPRAPI gpr_timespec
-gpr_time_0(gpr_clock_type type); /* The zero time interval. */
-GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type); /* The far future */
-GPRAPI gpr_timespec gpr_inf_past(gpr_clock_type type); /* The far past. */
+/* The zero time interval. */
+GPRAPI static inline gpr_timespec gpr_time_0(gpr_clock_type type) {
+ gpr_timespec out;
+ out.tv_sec = 0;
+ out.tv_nsec = 0;
+ out.clock_type = type;
+ return out;
+}
+
+/* The far future */
+GPRAPI static inline gpr_timespec gpr_inf_future(gpr_clock_type type) {
+ gpr_timespec out;
+ out.tv_sec = INT64_MAX;
+ out.tv_nsec = 0;
+ out.clock_type = type;
+ return out;
+}
+
+/* The far past. */
+GPRAPI static inline gpr_timespec gpr_inf_past(gpr_clock_type type) {
+ gpr_timespec out;
+ out.tv_sec = INT64_MIN;
+ out.tv_nsec = 0;
+ out.clock_type = type;
+ return out;
+}
+
#define GPR_MS_PER_SEC 1000
#define GPR_US_PER_SEC 1000000
diff --git a/src/core/support/time.c b/src/core/support/time.c
index 423d12ffc0..fec3c7a2c5 100644
--- a/src/core/support/time.c
+++ b/src/core/support/time.c
@@ -56,30 +56,6 @@ gpr_timespec gpr_time_max(gpr_timespec a, gpr_timespec b) {
return gpr_time_cmp(a, b) > 0 ? a : b;
}
-gpr_timespec gpr_time_0(gpr_clock_type type) {
- gpr_timespec out;
- out.tv_sec = 0;
- out.tv_nsec = 0;
- out.clock_type = type;
- return out;
-}
-
-gpr_timespec gpr_inf_future(gpr_clock_type type) {
- gpr_timespec out;
- out.tv_sec = INT64_MAX;
- out.tv_nsec = 0;
- out.clock_type = type;
- return out;
-}
-
-gpr_timespec gpr_inf_past(gpr_clock_type type) {
- gpr_timespec out;
- out.tv_sec = INT64_MIN;
- out.tv_nsec = 0;
- out.clock_type = type;
- return out;
-}
-
/* TODO(ctiller): consider merging _nanos, _micros, _millis into a single
function for maintainability. Similarly for _seconds, _minutes, and _hours */
diff --git a/src/cpp/codegen/core_codegen.cc b/src/cpp/codegen/core_codegen.cc
index ccae206291..00e8005f28 100644
--- a/src/cpp/codegen/core_codegen.cc
+++ b/src/cpp/codegen/core_codegen.cc
@@ -54,8 +54,7 @@ const int kGrpcBufferWriterMaxBufferLength = 8192;
class GrpcBufferWriter GRPC_FINAL
: public ::grpc::protobuf::io::ZeroCopyOutputStream {
public:
- explicit GrpcBufferWriter(grpc_byte_buffer** bp,
- int block_size)
+ explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
: block_size_(block_size), byte_count_(0), have_backup_(false) {
*bp = grpc_raw_byte_buffer_create(NULL, 0);
slice_buffer_ = &(*bp)->data.raw.slice_buffer;
@@ -170,22 +169,23 @@ namespace grpc {
class CoreCodegen : public CoreCodegenInterface {
private:
- grpc_completion_queue* CompletionQueueCreate() override {
- return grpc_completion_queue_create(nullptr);
+ grpc_completion_queue* grpc_completion_queue_create(void* reserved) override {
+ return ::grpc_completion_queue_create(reserved);
}
- grpc_event CompletionQueuePluck(grpc_completion_queue* cq, void* tag,
- gpr_timespec deadline) override {
- return grpc_completion_queue_pluck(cq, tag, deadline, nullptr);
+ void grpc_completion_queue_destroy(grpc_completion_queue* cq) override {
+ ::grpc_completion_queue_destroy(cq);
}
- void* gpr_malloc(size_t size) override {
- return ::gpr_malloc(size);
+ grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
+ gpr_timespec deadline,
+ void* reserved) override {
+ return ::grpc_completion_queue_pluck(cq, tag, deadline, reserved);
}
- void gpr_free(void* p) override {
- return ::gpr_free(p);
- }
+ void* gpr_malloc(size_t size) override { return ::gpr_malloc(size); }
+
+ void gpr_free(void* p) override { return ::gpr_free(p); }
void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override {
::grpc_byte_buffer_destroy(bb);
@@ -205,13 +205,14 @@ class CoreCodegen : public CoreCodegenInterface {
}
Status SerializeProto(const grpc::protobuf::Message& msg,
- grpc_byte_buffer** bp) override {
+ grpc_byte_buffer** bp) override {
GPR_TIMER_SCOPE("SerializeProto", 0);
int byte_size = msg.ByteSize();
if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
gpr_slice slice = gpr_slice_malloc(byte_size);
- GPR_ASSERT(GPR_SLICE_END_PTR(slice) ==
- msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice)));
+ GPR_ASSERT(
+ GPR_SLICE_END_PTR(slice) ==
+ msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice)));
*bp = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
return Status::OK;
@@ -224,7 +225,8 @@ class CoreCodegen : public CoreCodegenInterface {
}
Status DeserializeProto(grpc_byte_buffer* buffer,
- grpc::protobuf::Message* msg, int max_message_size) override {
+ grpc::protobuf::Message* msg,
+ int max_message_size) override {
GPR_TIMER_SCOPE("DeserializeProto", 0);
if (buffer == nullptr) {
return Status(StatusCode::INTERNAL, "No payload");
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc
index 4f76dfff1d..729dc33749 100644
--- a/src/cpp/common/completion_queue.cc
+++ b/src/cpp/common/completion_queue.cc
@@ -34,7 +34,6 @@
#include <memory>
-#include <grpc++/impl/codegen/completion_queue_tag.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/support/time.h>
#include <grpc/grpc.h>
@@ -43,16 +42,13 @@
namespace grpc {
static internal::GrpcLibraryInitializer g_gli_initializer;
-CompletionQueue::CompletionQueue() {
- g_gli_initializer.summon();
- cq_ = grpc_completion_queue_create(nullptr);
-}
CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {}
-CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); }
-
-void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); }
+void CompletionQueue::Shutdown() {
+ g_gli_initializer.summon();
+ grpc_completion_queue_shutdown(cq_);
+}
CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
void** tag, bool* ok, gpr_timespec deadline) {
@@ -75,25 +71,4 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
}
}
-bool CompletionQueue::Pluck(CompletionQueueTag* tag) {
- auto deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
- auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
- bool ok = ev.success != 0;
- void* ignored = tag;
- GPR_ASSERT(tag->FinalizeResult(&ignored, &ok));
- GPR_ASSERT(ignored == tag);
- // Ignore mutations by FinalizeResult: Pluck returns the C API status
- return ev.success != 0;
-}
-
-void CompletionQueue::TryPluck(CompletionQueueTag* tag) {
- auto deadline = gpr_time_0(GPR_CLOCK_REALTIME);
- auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
- if (ev.type == GRPC_QUEUE_TIMEOUT) return;
- bool ok = ev.success != 0;
- void* ignored = tag;
- // the tag must be swallowed if using TryPluck
- GPR_ASSERT(!tag->FinalizeResult(&ignored, &ok));
-}
-
} // namespace grpc