diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-02-18 09:18:33 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-02-18 09:18:33 -0800 |
commit | 645466e0899bfe811acc746d23d8eb15f784fdd3 (patch) | |
tree | 4814ffc1888f7011149e2f8c29ffd0e43dfa68d3 /src | |
parent | aface44b34f2fe890c28c7f4dcbc8dfbc47e4a19 (diff) |
Initial sketch
Diffstat (limited to 'src')
-rw-r--r-- | src/cpp/client/client_unary_call.cc | 5 | ||||
-rw-r--r-- | src/cpp/common/call.cc | 3 | ||||
-rw-r--r-- | src/cpp/common/completion_queue.cc | 51 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 9 | ||||
-rw-r--r-- | src/cpp/server/server_context.cc | 18 |
5 files changed, 60 insertions, 26 deletions
diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc index 03a0326128..7b904fc0f5 100644 --- a/src/cpp/client/client_unary_call.cc +++ b/src/cpp/client/client_unary_call.cc @@ -63,9 +63,10 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, class ClientAsyncRequest final : public CallOpBuffer { public: - void FinalizeResult(void **tag, bool *status) override { - CallOpBuffer::FinalizeResult(tag, status); + bool FinalizeResult(void **tag, bool *status) override { + bool r = CallOpBuffer::FinalizeResult(tag, status); delete this; + return r; } }; diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index 04af36f312..49cf9650c7 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -231,7 +231,7 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) { } } -void CallOpBuffer::FinalizeResult(void** tag, bool* status) { +bool CallOpBuffer::FinalizeResult(void** tag, bool* status) { // Release send buffers. if (send_message_buf_) { grpc_byte_buffer_destroy(send_message_buf_); @@ -274,6 +274,7 @@ void CallOpBuffer::FinalizeResult(void** tag, bool* status) { if (recv_closed_) { *recv_closed_ = cancelled_buf_ != 0; } + return true; } Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq) diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc index 4419b4b2f1..c330d21a46 100644 --- a/src/cpp/common/completion_queue.cc +++ b/src/cpp/common/completion_queue.cc @@ -43,7 +43,7 @@ namespace grpc { CompletionQueue::CompletionQueue() { cq_ = grpc_completion_queue_create(); } -CompletionQueue::CompletionQueue(grpc_completion_queue *take) : cq_(take) {} +CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {} CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); } @@ -52,34 +52,51 @@ void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); } // Helper class so we can declare a unique_ptr with grpc_event class EventDeleter { public: - void operator()(grpc_event *ev) { + void operator()(grpc_event* ev) { if (ev) grpc_event_finish(ev); } }; -bool CompletionQueue::Next(void **tag, bool *ok) { +bool CompletionQueue::Next(void** tag, bool* ok) { std::unique_ptr<grpc_event, EventDeleter> ev; - ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future)); - if (ev->type == GRPC_QUEUE_SHUTDOWN) { - return false; + for (;;) { + ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future)); + if (ev->type == GRPC_QUEUE_SHUTDOWN) { + return false; + } + auto cq_tag = static_cast<CompletionQueueTag*>(ev->tag); + *ok = ev->data.op_complete == GRPC_OP_OK; + *tag = cq_tag; + if (cq_tag->FinalizeResult(tag, ok)) { + return true; + } } - auto cq_tag = static_cast<CompletionQueueTag *>(ev->tag); - *ok = ev->data.op_complete == GRPC_OP_OK; - *tag = cq_tag; - cq_tag->FinalizeResult(tag, ok); - return true; } -bool CompletionQueue::Pluck(CompletionQueueTag *tag) { +bool CompletionQueue::Pluck(CompletionQueueTag* tag) { std::unique_ptr<grpc_event, EventDeleter> ev; - ev.reset(grpc_completion_queue_pluck(cq_, tag, gpr_inf_future)); + for (;;) { + ev.reset(grpc_completion_queue_pluck(cq_, tag, gpr_inf_future)); + bool ok = ev->data.op_complete == GRPC_OP_OK; + void* ignored = tag; + if (tag->FinalizeResult(&ignored, &ok)) { + GPR_ASSERT(ignored == tag); + return ok; + } + } +} + +void CompletionQueue::TryPluck(CompletionQueueTag* tag) { + std::unique_ptr<grpc_event, EventDeleter> ev; + + ev.reset(grpc_completion_queue_pluck(cq_, tag, gpr_inf_past)); + if (!ev) return; bool ok = ev->data.op_complete == GRPC_OP_OK; - void *ignored = tag; - tag->FinalizeResult(&ignored, &ok); - GPR_ASSERT(ignored == tag); - return ok; + void* ignored = tag; + // the tag must be swallowed if using TryPluck + GPR_ASSERT(!tag->FinalizeResult(&ignored, &ok)); } } // namespace grpc diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index ee9a1daa8e..8fffea640f 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -163,10 +163,11 @@ class Server::SyncRequest final : public CompletionQueueTag { this)); } - void FinalizeResult(void** tag, bool* status) override { + bool FinalizeResult(void** tag, bool* status) override { if (!*status) { grpc_completion_queue_destroy(cq_); } + return true; } class CallData final { @@ -310,11 +311,11 @@ class Server::AsyncRequest final : public CompletionQueueTag { grpc_metadata_array_destroy(&array_); } - void FinalizeResult(void** tag, bool* status) override { + bool FinalizeResult(void** tag, bool* status) override { *tag = tag_; if (*status && request_) { if (payload_) { - *status = *status && DeserializeProto(payload_, request_); + *status = DeserializeProto(payload_, request_); } else { *status = false; } @@ -331,8 +332,10 @@ class Server::AsyncRequest final : public CompletionQueueTag { } ctx_->call_ = call_; Call call(call_, server_, cq_); + // just the pointers inside call are copied here stream_->BindCall(&call); delete this; + return true; } private: diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index df4c4dc314..b9d85b95e9 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -40,7 +40,7 @@ namespace grpc { ServerContext::ServerContext() {} -ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata *metadata, +ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata* metadata, size_t metadata_count) : deadline_(Timespec2Timepoint(deadline)) { for (size_t i = 0; i < metadata_count; i++) { @@ -58,13 +58,25 @@ ServerContext::~ServerContext() { } void ServerContext::AddInitialMetadata(const grpc::string& key, - const grpc::string& value) { + const grpc::string& value) { initial_metadata_.insert(std::make_pair(key, value)); } void ServerContext::AddTrailingMetadata(const grpc::string& key, - const grpc::string& value) { + const grpc::string& value) { trailing_metadata_.insert(std::make_pair(key, value)); } +bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) { + cq->TryPluck(this); + std::lock_guard<std::mutex> g(mu_); + return finalized_ ? cancelled_ != 0 : false; +} + +bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { + std::lock_guard<std::mutex> g(mu_); + finalized_ = true; + return false; +} + } // namespace grpc |