aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2016-03-07 18:19:12 -0800
committerGravatar David Garcia Quintas <dgq@google.com>2016-03-07 18:19:12 -0800
commite1ce31eda3321bb0052416ba47145809a8199f1e (patch)
treea47309ad10b947c9672485ce7514071ae355a339 /src/cpp
parent6848c4e14584e55859018b30390589c418b93358 (diff)
wip. cq refactored
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/codegen/core_codegen.cc34
-rw-r--r--src/cpp/common/completion_queue.cc33
2 files changed, 22 insertions, 45 deletions
diff --git a/src/cpp/codegen/core_codegen.cc b/src/cpp/codegen/core_codegen.cc
index ccae206291..00e8005f28 100644
--- a/src/cpp/codegen/core_codegen.cc
+++ b/src/cpp/codegen/core_codegen.cc
@@ -54,8 +54,7 @@ const int kGrpcBufferWriterMaxBufferLength = 8192;
class GrpcBufferWriter GRPC_FINAL
: public ::grpc::protobuf::io::ZeroCopyOutputStream {
public:
- explicit GrpcBufferWriter(grpc_byte_buffer** bp,
- int block_size)
+ explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
: block_size_(block_size), byte_count_(0), have_backup_(false) {
*bp = grpc_raw_byte_buffer_create(NULL, 0);
slice_buffer_ = &(*bp)->data.raw.slice_buffer;
@@ -170,22 +169,23 @@ namespace grpc {
class CoreCodegen : public CoreCodegenInterface {
private:
- grpc_completion_queue* CompletionQueueCreate() override {
- return grpc_completion_queue_create(nullptr);
+ grpc_completion_queue* grpc_completion_queue_create(void* reserved) override {
+ return ::grpc_completion_queue_create(reserved);
}
- grpc_event CompletionQueuePluck(grpc_completion_queue* cq, void* tag,
- gpr_timespec deadline) override {
- return grpc_completion_queue_pluck(cq, tag, deadline, nullptr);
+ void grpc_completion_queue_destroy(grpc_completion_queue* cq) override {
+ ::grpc_completion_queue_destroy(cq);
}
- void* gpr_malloc(size_t size) override {
- return ::gpr_malloc(size);
+ grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
+ gpr_timespec deadline,
+ void* reserved) override {
+ return ::grpc_completion_queue_pluck(cq, tag, deadline, reserved);
}
- void gpr_free(void* p) override {
- return ::gpr_free(p);
- }
+ void* gpr_malloc(size_t size) override { return ::gpr_malloc(size); }
+
+ void gpr_free(void* p) override { return ::gpr_free(p); }
void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override {
::grpc_byte_buffer_destroy(bb);
@@ -205,13 +205,14 @@ class CoreCodegen : public CoreCodegenInterface {
}
Status SerializeProto(const grpc::protobuf::Message& msg,
- grpc_byte_buffer** bp) override {
+ grpc_byte_buffer** bp) override {
GPR_TIMER_SCOPE("SerializeProto", 0);
int byte_size = msg.ByteSize();
if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
gpr_slice slice = gpr_slice_malloc(byte_size);
- GPR_ASSERT(GPR_SLICE_END_PTR(slice) ==
- msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice)));
+ GPR_ASSERT(
+ GPR_SLICE_END_PTR(slice) ==
+ msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice)));
*bp = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
return Status::OK;
@@ -224,7 +225,8 @@ class CoreCodegen : public CoreCodegenInterface {
}
Status DeserializeProto(grpc_byte_buffer* buffer,
- grpc::protobuf::Message* msg, int max_message_size) override {
+ grpc::protobuf::Message* msg,
+ int max_message_size) override {
GPR_TIMER_SCOPE("DeserializeProto", 0);
if (buffer == nullptr) {
return Status(StatusCode::INTERNAL, "No payload");
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc
index 4f76dfff1d..729dc33749 100644
--- a/src/cpp/common/completion_queue.cc
+++ b/src/cpp/common/completion_queue.cc
@@ -34,7 +34,6 @@
#include <memory>
-#include <grpc++/impl/codegen/completion_queue_tag.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/support/time.h>
#include <grpc/grpc.h>
@@ -43,16 +42,13 @@
namespace grpc {
static internal::GrpcLibraryInitializer g_gli_initializer;
-CompletionQueue::CompletionQueue() {
- g_gli_initializer.summon();
- cq_ = grpc_completion_queue_create(nullptr);
-}
CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {}
-CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); }
-
-void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); }
+void CompletionQueue::Shutdown() {
+ g_gli_initializer.summon();
+ grpc_completion_queue_shutdown(cq_);
+}
CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
void** tag, bool* ok, gpr_timespec deadline) {
@@ -75,25 +71,4 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
}
}
-bool CompletionQueue::Pluck(CompletionQueueTag* tag) {
- auto deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
- auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
- bool ok = ev.success != 0;
- void* ignored = tag;
- GPR_ASSERT(tag->FinalizeResult(&ignored, &ok));
- GPR_ASSERT(ignored == tag);
- // Ignore mutations by FinalizeResult: Pluck returns the C API status
- return ev.success != 0;
-}
-
-void CompletionQueue::TryPluck(CompletionQueueTag* tag) {
- auto deadline = gpr_time_0(GPR_CLOCK_REALTIME);
- auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
- if (ev.type == GRPC_QUEUE_TIMEOUT) return;
- bool ok = ev.success != 0;
- void* ignored = tag;
- // the tag must be swallowed if using TryPluck
- GPR_ASSERT(!tag->FinalizeResult(&ignored, &ok));
-}
-
} // namespace grpc