diff options
author | David Garcia Quintas <dgq@google.com> | 2016-03-07 18:19:12 -0800 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2016-03-07 18:19:12 -0800 |
commit | e1ce31eda3321bb0052416ba47145809a8199f1e (patch) | |
tree | a47309ad10b947c9672485ce7514071ae355a339 /src/cpp | |
parent | 6848c4e14584e55859018b30390589c418b93358 (diff) |
wip. cq refactored
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/codegen/core_codegen.cc | 34 | ||||
-rw-r--r-- | src/cpp/common/completion_queue.cc | 33 |
2 files changed, 22 insertions, 45 deletions
diff --git a/src/cpp/codegen/core_codegen.cc b/src/cpp/codegen/core_codegen.cc index ccae206291..00e8005f28 100644 --- a/src/cpp/codegen/core_codegen.cc +++ b/src/cpp/codegen/core_codegen.cc @@ -54,8 +54,7 @@ const int kGrpcBufferWriterMaxBufferLength = 8192; class GrpcBufferWriter GRPC_FINAL : public ::grpc::protobuf::io::ZeroCopyOutputStream { public: - explicit GrpcBufferWriter(grpc_byte_buffer** bp, - int block_size) + explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size) : block_size_(block_size), byte_count_(0), have_backup_(false) { *bp = grpc_raw_byte_buffer_create(NULL, 0); slice_buffer_ = &(*bp)->data.raw.slice_buffer; @@ -170,22 +169,23 @@ namespace grpc { class CoreCodegen : public CoreCodegenInterface { private: - grpc_completion_queue* CompletionQueueCreate() override { - return grpc_completion_queue_create(nullptr); + grpc_completion_queue* grpc_completion_queue_create(void* reserved) override { + return ::grpc_completion_queue_create(reserved); } - grpc_event CompletionQueuePluck(grpc_completion_queue* cq, void* tag, - gpr_timespec deadline) override { - return grpc_completion_queue_pluck(cq, tag, deadline, nullptr); + void grpc_completion_queue_destroy(grpc_completion_queue* cq) override { + ::grpc_completion_queue_destroy(cq); } - void* gpr_malloc(size_t size) override { - return ::gpr_malloc(size); + grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag, + gpr_timespec deadline, + void* reserved) override { + return ::grpc_completion_queue_pluck(cq, tag, deadline, reserved); } - void gpr_free(void* p) override { - return ::gpr_free(p); - } + void* gpr_malloc(size_t size) override { return ::gpr_malloc(size); } + + void gpr_free(void* p) override { return ::gpr_free(p); } void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override { ::grpc_byte_buffer_destroy(bb); @@ -205,13 +205,14 @@ class CoreCodegen : public CoreCodegenInterface { } Status SerializeProto(const grpc::protobuf::Message& msg, - grpc_byte_buffer** bp) override { + grpc_byte_buffer** bp) override { GPR_TIMER_SCOPE("SerializeProto", 0); int byte_size = msg.ByteSize(); if (byte_size <= kGrpcBufferWriterMaxBufferLength) { gpr_slice slice = gpr_slice_malloc(byte_size); - GPR_ASSERT(GPR_SLICE_END_PTR(slice) == - msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice))); + GPR_ASSERT( + GPR_SLICE_END_PTR(slice) == + msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice))); *bp = grpc_raw_byte_buffer_create(&slice, 1); gpr_slice_unref(slice); return Status::OK; @@ -224,7 +225,8 @@ class CoreCodegen : public CoreCodegenInterface { } Status DeserializeProto(grpc_byte_buffer* buffer, - grpc::protobuf::Message* msg, int max_message_size) override { + grpc::protobuf::Message* msg, + int max_message_size) override { GPR_TIMER_SCOPE("DeserializeProto", 0); if (buffer == nullptr) { return Status(StatusCode::INTERNAL, "No payload"); diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc index 4f76dfff1d..729dc33749 100644 --- a/src/cpp/common/completion_queue.cc +++ b/src/cpp/common/completion_queue.cc @@ -34,7 +34,6 @@ #include <memory> -#include <grpc++/impl/codegen/completion_queue_tag.h> #include <grpc++/impl/grpc_library.h> #include <grpc++/support/time.h> #include <grpc/grpc.h> @@ -43,16 +42,13 @@ namespace grpc { static internal::GrpcLibraryInitializer g_gli_initializer; -CompletionQueue::CompletionQueue() { - g_gli_initializer.summon(); - cq_ = grpc_completion_queue_create(nullptr); -} CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {} -CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); } - -void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); } +void CompletionQueue::Shutdown() { + g_gli_initializer.summon(); + grpc_completion_queue_shutdown(cq_); +} CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( void** tag, bool* ok, gpr_timespec deadline) { @@ -75,25 +71,4 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( } } -bool CompletionQueue::Pluck(CompletionQueueTag* tag) { - auto deadline = gpr_inf_future(GPR_CLOCK_REALTIME); - auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr); - bool ok = ev.success != 0; - void* ignored = tag; - GPR_ASSERT(tag->FinalizeResult(&ignored, &ok)); - GPR_ASSERT(ignored == tag); - // Ignore mutations by FinalizeResult: Pluck returns the C API status - return ev.success != 0; -} - -void CompletionQueue::TryPluck(CompletionQueueTag* tag) { - auto deadline = gpr_time_0(GPR_CLOCK_REALTIME); - auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr); - if (ev.type == GRPC_QUEUE_TIMEOUT) return; - bool ok = ev.success != 0; - void* ignored = tag; - // the tag must be swallowed if using TryPluck - GPR_ASSERT(!tag->FinalizeResult(&ignored, &ok)); -} - } // namespace grpc |