aboutsummaryrefslogtreecommitdiffhomepage
path: root/include/grpc++
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-09 17:15:03 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-09 17:15:03 -0800
commitde917062ecacbeb547c8e2e4e3f4260d3e9a2f2e (patch)
tree20befa87421de79a47afc542554549400e0bd8e6 /include/grpc++
parent7630205bdfe2b7871e810f8ea7eab388d02240bf (diff)
Refine completion queue
Diffstat (limited to 'include/grpc++')
-rw-r--r--include/grpc++/call.h9
-rw-r--r--include/grpc++/channel_interface.h2
-rw-r--r--include/grpc++/completion_queue.h53
-rw-r--r--include/grpc++/stream.h78
4 files changed, 73 insertions, 69 deletions
diff --git a/include/grpc++/call.h b/include/grpc++/call.h
index 5aa96d33b9..94215bfa98 100644
--- a/include/grpc++/call.h
+++ b/include/grpc++/call.h
@@ -55,6 +55,8 @@ class ChannelInterface;
class CallOpBuffer final : public CompletionQueueTag {
public:
+ CallOpBuffer() : return_tag_(this) {}
+
void AddSendInitialMetadata(std::vector<std::pair<grpc::string, grpc::string> > *metadata);
void AddSendMessage(const google::protobuf::Message &message);
void AddRecvMessage(google::protobuf::Message *message);
@@ -67,7 +69,10 @@ class CallOpBuffer final : public CompletionQueueTag {
void FillOps(grpc_op *ops, size_t *nops);
// Called by completion queue just prior to returning from Next() or Pluck()
- FinalizeResultOutput FinalizeResult(bool status) override;
+ void FinalizeResult(void *tag, bool *status) override;
+
+ private:
+ void *return_tag_;
};
class CCallDeleter {
@@ -80,7 +85,7 @@ class Call final {
public:
Call(grpc_call *call, ChannelInterface *channel, CompletionQueue *cq);
- void PerformOps(CallOpBuffer *buffer, void *tag);
+ void PerformOps(CallOpBuffer *buffer);
grpc_call *call() { return call_.get(); }
CompletionQueue *cq() { return cq_; }
diff --git a/include/grpc++/channel_interface.h b/include/grpc++/channel_interface.h
index 79466c9fda..c128a08a9f 100644
--- a/include/grpc++/channel_interface.h
+++ b/include/grpc++/channel_interface.h
@@ -58,7 +58,7 @@ class ChannelInterface {
virtual Call CreateCall(const RpcMethod &method, ClientContext *context,
CompletionQueue *cq) = 0;
- virtual void PerformOpsOnCall(CallOpBuffer *ops, void *tag, Call *call) = 0;
+ virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0;
};
// Wrapper that begins an asynchronous unary call
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index 8033fd1205..641d599c7e 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -38,21 +38,27 @@ struct grpc_completion_queue;
namespace grpc {
+template <class R>
+class ClientReader;
+template <class W>
+class ClientWriter;
+template <class R, class W>
+class ClientReaderWriter;
+template <class R>
+class ServerReader;
+template <class W>
+class ServerWriter;
+template <class R, class W>
+class ServerReaderWriter;
+
class CompletionQueue;
class CompletionQueueTag {
public:
- enum FinalizeResultOutput {
- SUCCEED,
- FAIL,
- SWALLOW,
- };
-
- virtual FinalizeResultOutput FinalizeResult(bool status) = 0;
-
- private:
- friend class CompletionQueue;
- void *user_tag_;
+ // Called prior to returning from Next(), return value
+ // is the status of the operation (return status is the default thing
+ // to do)
+ virtual void FinalizeResult(void *tag, bool *status) = 0;
};
// grpc_completion_queue wrapper class
@@ -66,22 +72,6 @@ class CompletionQueue {
// for destruction.
bool Next(void **tag, bool *ok);
- bool Pluck(void *tag);
-
- // Prepare a tag for the C api
- // Given a tag we'd like to receive from Next, what tag should we pass
- // down to the C api?
- // Usage example:
- // grpc_call_start_batch(..., cq.PrepareTagForC(tag));
- // Allows attaching some work to be executed before the original tag
- // is returned.
- // MUST be used for all events that could be surfaced through this
- // wrapping API
- void *PrepareTagForC(CompletionQueueTag *cq_tag, void *user_tag) {
- cq_tag->user_tag_ = user_tag;
- return cq_tag;
- }
-
// Shutdown has to be called, and the CompletionQueue can only be
// destructed when false is returned from Next().
void Shutdown();
@@ -89,6 +79,15 @@ class CompletionQueue {
grpc_completion_queue* cq() { return cq_; }
private:
+ template <class R> friend class ::grpc::ClientReader;
+ template <class W> friend class ::grpc::ClientWriter;
+ template <class R, class W> friend class ::grpc::ClientReaderWriter;
+ template <class R> friend class ::grpc::ServerReader;
+ template <class W> friend class ::grpc::ServerWriter;
+ template <class R, class W> friend class ::grpc::ServerReaderWriter;
+
+ bool Pluck(CompletionQueueTag *tag);
+
grpc_completion_queue* cq_; // owned
};
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index 22dc44efe4..ca32d60810 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -123,23 +123,23 @@ class ClientReader final : public ClientStreamingInterface,
CallOpBuffer buf;
buf.AddSendMessage(request);
buf.AddClientSendClose();
- call_.PerformOps(&buf, (void *)1);
- cq_.Pluck((void *)1);
+ call_.PerformOps(&buf);
+ cq_.Pluck(&buf);
}
virtual bool Read(R *msg) override {
CallOpBuffer buf;
buf.AddRecvMessage(msg);
- call_.PerformOps(&buf, (void *)2);
- return cq_.Pluck((void *)2);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
}
virtual Status Finish() override {
CallOpBuffer buf;
Status status;
buf.AddClientRecvStatus(&status);
- call_.PerformOps(&buf, (void *)3);
- GPR_ASSERT(cq_.Pluck((void *)3));
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
return status;
}
@@ -162,15 +162,15 @@ class ClientWriter final : public ClientStreamingInterface,
virtual bool Write(const W& msg) override {
CallOpBuffer buf;
buf.AddSendMessage(msg);
- call_.PerformOps(&buf, (void *)2);
- return cq_.Pluck((void *)2);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
}
virtual bool WritesDone() {
CallOpBuffer buf;
buf.AddClientSendClose();
- call_.PerformOps(&buf, (void *)3);
- return cq_.Pluck((void *)3);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
}
// Read the final response and wait for the final status.
@@ -179,8 +179,8 @@ class ClientWriter final : public ClientStreamingInterface,
Status status;
buf.AddRecvMessage(response_);
buf.AddClientRecvStatus(&status);
- call_.PerformOps(&buf, (void *)4);
- GPR_ASSERT(cq_.Pluck((void *)4));
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
return status;
}
@@ -204,30 +204,30 @@ class ClientReaderWriter final : public ClientStreamingInterface,
virtual bool Read(R *msg) override {
CallOpBuffer buf;
buf.AddRecvMessage(msg);
- call_.PerformOps(&buf, (void *)2);
- return cq_.Pluck((void *)2);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
}
virtual bool Write(const W& msg) override {
CallOpBuffer buf;
buf.AddSendMessage(msg);
- call_.PerformOps(&buf, (void *)3);
- return cq_.Pluck((void *)3);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
}
virtual bool WritesDone() {
CallOpBuffer buf;
buf.AddClientSendClose();
- call_.PerformOps(&buf, (void *)4);
- return cq_.Pluck((void *)4);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
}
virtual Status Finish() override {
CallOpBuffer buf;
Status status;
buf.AddClientRecvStatus(&status);
- call_.PerformOps(&buf, (void *)5);
- GPR_ASSERT(cq_.Pluck((void *)5));
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
return status;
}
@@ -244,8 +244,8 @@ class ServerReader final : public ReaderInterface<R> {
virtual bool Read(R* msg) override {
CallOpBuffer buf;
buf.AddRecvMessage(msg);
- call_->PerformOps(&buf, (void *)2);
- return call_->cq()->Pluck((void *)2);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
}
private:
@@ -260,8 +260,8 @@ class ServerWriter final : public WriterInterface<W> {
virtual bool Write(const W& msg) override {
CallOpBuffer buf;
buf.AddSendMessage(msg);
- call_->PerformOps(&buf, (void *)2);
- return call_->cq()->Pluck((void *)2);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
}
private:
@@ -278,15 +278,15 @@ class ServerReaderWriter final : public WriterInterface<W>,
virtual bool Read(R* msg) override {
CallOpBuffer buf;
buf.AddRecvMessage(msg);
- call_->PerformOps(&buf, (void *)2);
- return call_->cq()->Pluck((void *)2);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
}
virtual bool Write(const W& msg) override {
CallOpBuffer buf;
buf.AddSendMessage(msg);
- call_->PerformOps(&buf, (void *)3);
- return call_->cq()->Pluck((void *)3);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
}
private:
@@ -333,19 +333,19 @@ class ClientAsyncReader final : public ClientAsyncStreamingInterface,
CallOpBuffer buf;
buf.AddSendMessage(request);
buf.AddClientSendClose();
- call_.PerformOps(&buf, tag);
+ call_.PerformOps(&buf);
}
virtual void Read(R *msg, void* tag) override {
CallOpBuffer buf;
buf.AddRecvMessage(msg);
- call_.PerformOps(&buf, tag);
+ call_.PerformOps(&buf);
}
virtual void Finish(Status* status, void* tag) override {
CallOpBuffer buf;
buf.AddClientRecvStatus(status);
- call_.PerformOps(&buf, tag);
+ call_.PerformOps(&buf);
}
private:
@@ -367,20 +367,20 @@ class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
virtual void Write(const W& msg, void* tag) override {
CallOpBuffer buf;
buf.AddSendMessage(msg);
- call_.PerformOps(&buf, tag);
+ call_.PerformOps(&buf);
}
virtual void WritesDone(void* tag) {
CallOpBuffer buf;
buf.AddClientSendClose();
- call_.PerformOps(&buf, tag);
+ call_.PerformOps(&buf);
}
virtual void Finish(Status* status, void* tag) override {
CallOpBuffer buf;
buf.AddRecvMessage(response_);
buf.AddClientRecvStatus(status);
- call_.PerformOps(&buf, tag);
+ call_.PerformOps(&buf);
}
private:
@@ -402,25 +402,25 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
virtual void Read(R *msg, void* tag) override {
CallOpBuffer buf;
buf.AddRecvMessage(msg);
- call_.PerformOps(&buf, tag);
+ call_.PerformOps(&buf);
}
virtual void Write(const W& msg, void* tag) override {
CallOpBuffer buf;
buf.AddSendMessage(msg);
- call_.PerformOps(&buf, tag);
+ call_.PerformOps(&buf);
}
virtual void WritesDone(void* tag) {
CallOpBuffer buf;
buf.AddClientSendClose();
- call_.PerformOps(&buf, tag);
+ call_.PerformOps(&buf);
}
virtual void Finish(Status* status, void* tag) override {
CallOpBuffer buf;
buf.AddClientRecvStatus(status);
- call_.PerformOps(&buf, tag);
+ call_.PerformOps(&buf);
}
private:
@@ -437,7 +437,7 @@ class ServerAsyncResponseWriter final {
virtual void Write(const W& msg, void* tag) override {
CallOpBuffer buf;
buf.AddSendMessage(msg);
- call_->PerformOps(&buf, tag);
+ call_->PerformOps(&buf);
}
private: