aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-18 09:18:33 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-18 09:18:33 -0800
commit645466e0899bfe811acc746d23d8eb15f784fdd3 (patch)
tree4814ffc1888f7011149e2f8c29ffd0e43dfa68d3 /src
parentaface44b34f2fe890c28c7f4dcbc8dfbc47e4a19 (diff)
Initial sketch
Diffstat (limited to 'src')
-rw-r--r--src/cpp/client/client_unary_call.cc5
-rw-r--r--src/cpp/common/call.cc3
-rw-r--r--src/cpp/common/completion_queue.cc51
-rw-r--r--src/cpp/server/server.cc9
-rw-r--r--src/cpp/server/server_context.cc18
5 files changed, 60 insertions, 26 deletions
diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc
index 03a0326128..7b904fc0f5 100644
--- a/src/cpp/client/client_unary_call.cc
+++ b/src/cpp/client/client_unary_call.cc
@@ -63,9 +63,10 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
class ClientAsyncRequest final : public CallOpBuffer {
public:
- void FinalizeResult(void **tag, bool *status) override {
- CallOpBuffer::FinalizeResult(tag, status);
+ bool FinalizeResult(void **tag, bool *status) override {
+ bool r = CallOpBuffer::FinalizeResult(tag, status);
delete this;
+ return r;
}
};
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index 04af36f312..49cf9650c7 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -231,7 +231,7 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) {
}
}
-void CallOpBuffer::FinalizeResult(void** tag, bool* status) {
+bool CallOpBuffer::FinalizeResult(void** tag, bool* status) {
// Release send buffers.
if (send_message_buf_) {
grpc_byte_buffer_destroy(send_message_buf_);
@@ -274,6 +274,7 @@ void CallOpBuffer::FinalizeResult(void** tag, bool* status) {
if (recv_closed_) {
*recv_closed_ = cancelled_buf_ != 0;
}
+ return true;
}
Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq)
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc
index 4419b4b2f1..c330d21a46 100644
--- a/src/cpp/common/completion_queue.cc
+++ b/src/cpp/common/completion_queue.cc
@@ -43,7 +43,7 @@ namespace grpc {
CompletionQueue::CompletionQueue() { cq_ = grpc_completion_queue_create(); }
-CompletionQueue::CompletionQueue(grpc_completion_queue *take) : cq_(take) {}
+CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {}
CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); }
@@ -52,34 +52,51 @@ void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); }
// Helper class so we can declare a unique_ptr with grpc_event
class EventDeleter {
public:
- void operator()(grpc_event *ev) {
+ void operator()(grpc_event* ev) {
if (ev) grpc_event_finish(ev);
}
};
-bool CompletionQueue::Next(void **tag, bool *ok) {
+bool CompletionQueue::Next(void** tag, bool* ok) {
std::unique_ptr<grpc_event, EventDeleter> ev;
- ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future));
- if (ev->type == GRPC_QUEUE_SHUTDOWN) {
- return false;
+ for (;;) {
+ ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future));
+ if (ev->type == GRPC_QUEUE_SHUTDOWN) {
+ return false;
+ }
+ auto cq_tag = static_cast<CompletionQueueTag*>(ev->tag);
+ *ok = ev->data.op_complete == GRPC_OP_OK;
+ *tag = cq_tag;
+ if (cq_tag->FinalizeResult(tag, ok)) {
+ return true;
+ }
}
- auto cq_tag = static_cast<CompletionQueueTag *>(ev->tag);
- *ok = ev->data.op_complete == GRPC_OP_OK;
- *tag = cq_tag;
- cq_tag->FinalizeResult(tag, ok);
- return true;
}
-bool CompletionQueue::Pluck(CompletionQueueTag *tag) {
+bool CompletionQueue::Pluck(CompletionQueueTag* tag) {
std::unique_ptr<grpc_event, EventDeleter> ev;
- ev.reset(grpc_completion_queue_pluck(cq_, tag, gpr_inf_future));
+ for (;;) {
+ ev.reset(grpc_completion_queue_pluck(cq_, tag, gpr_inf_future));
+ bool ok = ev->data.op_complete == GRPC_OP_OK;
+ void* ignored = tag;
+ if (tag->FinalizeResult(&ignored, &ok)) {
+ GPR_ASSERT(ignored == tag);
+ return ok;
+ }
+ }
+}
+
+void CompletionQueue::TryPluck(CompletionQueueTag* tag) {
+ std::unique_ptr<grpc_event, EventDeleter> ev;
+
+ ev.reset(grpc_completion_queue_pluck(cq_, tag, gpr_inf_past));
+ if (!ev) return;
bool ok = ev->data.op_complete == GRPC_OP_OK;
- void *ignored = tag;
- tag->FinalizeResult(&ignored, &ok);
- GPR_ASSERT(ignored == tag);
- return ok;
+ void* ignored = tag;
+ // the tag must be swallowed if using TryPluck
+ GPR_ASSERT(!tag->FinalizeResult(&ignored, &ok));
}
} // namespace grpc
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index ee9a1daa8e..8fffea640f 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -163,10 +163,11 @@ class Server::SyncRequest final : public CompletionQueueTag {
this));
}
- void FinalizeResult(void** tag, bool* status) override {
+ bool FinalizeResult(void** tag, bool* status) override {
if (!*status) {
grpc_completion_queue_destroy(cq_);
}
+ return true;
}
class CallData final {
@@ -310,11 +311,11 @@ class Server::AsyncRequest final : public CompletionQueueTag {
grpc_metadata_array_destroy(&array_);
}
- void FinalizeResult(void** tag, bool* status) override {
+ bool FinalizeResult(void** tag, bool* status) override {
*tag = tag_;
if (*status && request_) {
if (payload_) {
- *status = *status && DeserializeProto(payload_, request_);
+ *status = DeserializeProto(payload_, request_);
} else {
*status = false;
}
@@ -331,8 +332,10 @@ class Server::AsyncRequest final : public CompletionQueueTag {
}
ctx_->call_ = call_;
Call call(call_, server_, cq_);
+ // just the pointers inside call are copied here
stream_->BindCall(&call);
delete this;
+ return true;
}
private:
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index df4c4dc314..b9d85b95e9 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -40,7 +40,7 @@ namespace grpc {
ServerContext::ServerContext() {}
-ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata *metadata,
+ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata* metadata,
size_t metadata_count)
: deadline_(Timespec2Timepoint(deadline)) {
for (size_t i = 0; i < metadata_count; i++) {
@@ -58,13 +58,25 @@ ServerContext::~ServerContext() {
}
void ServerContext::AddInitialMetadata(const grpc::string& key,
- const grpc::string& value) {
+ const grpc::string& value) {
initial_metadata_.insert(std::make_pair(key, value));
}
void ServerContext::AddTrailingMetadata(const grpc::string& key,
- const grpc::string& value) {
+ const grpc::string& value) {
trailing_metadata_.insert(std::make_pair(key, value));
}
+bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) {
+ cq->TryPluck(this);
+ std::lock_guard<std::mutex> g(mu_);
+ return finalized_ ? cancelled_ != 0 : false;
+}
+
+bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
+ std::lock_guard<std::mutex> g(mu_);
+ finalized_ = true;
+ return false;
+}
+
} // namespace grpc