diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-02-09 17:15:03 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-02-09 17:15:03 -0800 |
commit | de917062ecacbeb547c8e2e4e3f4260d3e9a2f2e (patch) | |
tree | 20befa87421de79a47afc542554549400e0bd8e6 /include/grpc++ | |
parent | 7630205bdfe2b7871e810f8ea7eab388d02240bf (diff) |
Refine completion queue
Diffstat (limited to 'include/grpc++')
-rw-r--r-- | include/grpc++/call.h | 9 | ||||
-rw-r--r-- | include/grpc++/channel_interface.h | 2 | ||||
-rw-r--r-- | include/grpc++/completion_queue.h | 53 | ||||
-rw-r--r-- | include/grpc++/stream.h | 78 |
4 files changed, 73 insertions, 69 deletions
diff --git a/include/grpc++/call.h b/include/grpc++/call.h index 5aa96d33b9..94215bfa98 100644 --- a/include/grpc++/call.h +++ b/include/grpc++/call.h @@ -55,6 +55,8 @@ class ChannelInterface; class CallOpBuffer final : public CompletionQueueTag { public: + CallOpBuffer() : return_tag_(this) {} + void AddSendInitialMetadata(std::vector<std::pair<grpc::string, grpc::string> > *metadata); void AddSendMessage(const google::protobuf::Message &message); void AddRecvMessage(google::protobuf::Message *message); @@ -67,7 +69,10 @@ class CallOpBuffer final : public CompletionQueueTag { void FillOps(grpc_op *ops, size_t *nops); // Called by completion queue just prior to returning from Next() or Pluck() - FinalizeResultOutput FinalizeResult(bool status) override; + void FinalizeResult(void *tag, bool *status) override; + + private: + void *return_tag_; }; class CCallDeleter { @@ -80,7 +85,7 @@ class Call final { public: Call(grpc_call *call, ChannelInterface *channel, CompletionQueue *cq); - void PerformOps(CallOpBuffer *buffer, void *tag); + void PerformOps(CallOpBuffer *buffer); grpc_call *call() { return call_.get(); } CompletionQueue *cq() { return cq_; } diff --git a/include/grpc++/channel_interface.h b/include/grpc++/channel_interface.h index 79466c9fda..c128a08a9f 100644 --- a/include/grpc++/channel_interface.h +++ b/include/grpc++/channel_interface.h @@ -58,7 +58,7 @@ class ChannelInterface { virtual Call CreateCall(const RpcMethod &method, ClientContext *context, CompletionQueue *cq) = 0; - virtual void PerformOpsOnCall(CallOpBuffer *ops, void *tag, Call *call) = 0; + virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0; }; // Wrapper that begins an asynchronous unary call diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index 8033fd1205..641d599c7e 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -38,21 +38,27 @@ struct grpc_completion_queue; namespace grpc { +template <class R> +class ClientReader; +template <class W> +class ClientWriter; +template <class R, class W> +class ClientReaderWriter; +template <class R> +class ServerReader; +template <class W> +class ServerWriter; +template <class R, class W> +class ServerReaderWriter; + class CompletionQueue; class CompletionQueueTag { public: - enum FinalizeResultOutput { - SUCCEED, - FAIL, - SWALLOW, - }; - - virtual FinalizeResultOutput FinalizeResult(bool status) = 0; - - private: - friend class CompletionQueue; - void *user_tag_; + // Called prior to returning from Next(), return value + // is the status of the operation (return status is the default thing + // to do) + virtual void FinalizeResult(void *tag, bool *status) = 0; }; // grpc_completion_queue wrapper class @@ -66,22 +72,6 @@ class CompletionQueue { // for destruction. bool Next(void **tag, bool *ok); - bool Pluck(void *tag); - - // Prepare a tag for the C api - // Given a tag we'd like to receive from Next, what tag should we pass - // down to the C api? - // Usage example: - // grpc_call_start_batch(..., cq.PrepareTagForC(tag)); - // Allows attaching some work to be executed before the original tag - // is returned. - // MUST be used for all events that could be surfaced through this - // wrapping API - void *PrepareTagForC(CompletionQueueTag *cq_tag, void *user_tag) { - cq_tag->user_tag_ = user_tag; - return cq_tag; - } - // Shutdown has to be called, and the CompletionQueue can only be // destructed when false is returned from Next(). void Shutdown(); @@ -89,6 +79,15 @@ class CompletionQueue { grpc_completion_queue* cq() { return cq_; } private: + template <class R> friend class ::grpc::ClientReader; + template <class W> friend class ::grpc::ClientWriter; + template <class R, class W> friend class ::grpc::ClientReaderWriter; + template <class R> friend class ::grpc::ServerReader; + template <class W> friend class ::grpc::ServerWriter; + template <class R, class W> friend class ::grpc::ServerReaderWriter; + + bool Pluck(CompletionQueueTag *tag); + grpc_completion_queue* cq_; // owned }; diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 22dc44efe4..ca32d60810 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -123,23 +123,23 @@ class ClientReader final : public ClientStreamingInterface, CallOpBuffer buf; buf.AddSendMessage(request); buf.AddClientSendClose(); - call_.PerformOps(&buf, (void *)1); - cq_.Pluck((void *)1); + call_.PerformOps(&buf); + cq_.Pluck(&buf); } virtual bool Read(R *msg) override { CallOpBuffer buf; buf.AddRecvMessage(msg); - call_.PerformOps(&buf, (void *)2); - return cq_.Pluck((void *)2); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); } virtual Status Finish() override { CallOpBuffer buf; Status status; buf.AddClientRecvStatus(&status); - call_.PerformOps(&buf, (void *)3); - GPR_ASSERT(cq_.Pluck((void *)3)); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); return status; } @@ -162,15 +162,15 @@ class ClientWriter final : public ClientStreamingInterface, virtual bool Write(const W& msg) override { CallOpBuffer buf; buf.AddSendMessage(msg); - call_.PerformOps(&buf, (void *)2); - return cq_.Pluck((void *)2); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); } virtual bool WritesDone() { CallOpBuffer buf; buf.AddClientSendClose(); - call_.PerformOps(&buf, (void *)3); - return cq_.Pluck((void *)3); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); } // Read the final response and wait for the final status. @@ -179,8 +179,8 @@ class ClientWriter final : public ClientStreamingInterface, Status status; buf.AddRecvMessage(response_); buf.AddClientRecvStatus(&status); - call_.PerformOps(&buf, (void *)4); - GPR_ASSERT(cq_.Pluck((void *)4)); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); return status; } @@ -204,30 +204,30 @@ class ClientReaderWriter final : public ClientStreamingInterface, virtual bool Read(R *msg) override { CallOpBuffer buf; buf.AddRecvMessage(msg); - call_.PerformOps(&buf, (void *)2); - return cq_.Pluck((void *)2); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); } virtual bool Write(const W& msg) override { CallOpBuffer buf; buf.AddSendMessage(msg); - call_.PerformOps(&buf, (void *)3); - return cq_.Pluck((void *)3); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); } virtual bool WritesDone() { CallOpBuffer buf; buf.AddClientSendClose(); - call_.PerformOps(&buf, (void *)4); - return cq_.Pluck((void *)4); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); } virtual Status Finish() override { CallOpBuffer buf; Status status; buf.AddClientRecvStatus(&status); - call_.PerformOps(&buf, (void *)5); - GPR_ASSERT(cq_.Pluck((void *)5)); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); return status; } @@ -244,8 +244,8 @@ class ServerReader final : public ReaderInterface<R> { virtual bool Read(R* msg) override { CallOpBuffer buf; buf.AddRecvMessage(msg); - call_->PerformOps(&buf, (void *)2); - return call_->cq()->Pluck((void *)2); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); } private: @@ -260,8 +260,8 @@ class ServerWriter final : public WriterInterface<W> { virtual bool Write(const W& msg) override { CallOpBuffer buf; buf.AddSendMessage(msg); - call_->PerformOps(&buf, (void *)2); - return call_->cq()->Pluck((void *)2); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); } private: @@ -278,15 +278,15 @@ class ServerReaderWriter final : public WriterInterface<W>, virtual bool Read(R* msg) override { CallOpBuffer buf; buf.AddRecvMessage(msg); - call_->PerformOps(&buf, (void *)2); - return call_->cq()->Pluck((void *)2); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); } virtual bool Write(const W& msg) override { CallOpBuffer buf; buf.AddSendMessage(msg); - call_->PerformOps(&buf, (void *)3); - return call_->cq()->Pluck((void *)3); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); } private: @@ -333,19 +333,19 @@ class ClientAsyncReader final : public ClientAsyncStreamingInterface, CallOpBuffer buf; buf.AddSendMessage(request); buf.AddClientSendClose(); - call_.PerformOps(&buf, tag); + call_.PerformOps(&buf); } virtual void Read(R *msg, void* tag) override { CallOpBuffer buf; buf.AddRecvMessage(msg); - call_.PerformOps(&buf, tag); + call_.PerformOps(&buf); } virtual void Finish(Status* status, void* tag) override { CallOpBuffer buf; buf.AddClientRecvStatus(status); - call_.PerformOps(&buf, tag); + call_.PerformOps(&buf); } private: @@ -367,20 +367,20 @@ class ClientAsyncWriter final : public ClientAsyncStreamingInterface, virtual void Write(const W& msg, void* tag) override { CallOpBuffer buf; buf.AddSendMessage(msg); - call_.PerformOps(&buf, tag); + call_.PerformOps(&buf); } virtual void WritesDone(void* tag) { CallOpBuffer buf; buf.AddClientSendClose(); - call_.PerformOps(&buf, tag); + call_.PerformOps(&buf); } virtual void Finish(Status* status, void* tag) override { CallOpBuffer buf; buf.AddRecvMessage(response_); buf.AddClientRecvStatus(status); - call_.PerformOps(&buf, tag); + call_.PerformOps(&buf); } private: @@ -402,25 +402,25 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, virtual void Read(R *msg, void* tag) override { CallOpBuffer buf; buf.AddRecvMessage(msg); - call_.PerformOps(&buf, tag); + call_.PerformOps(&buf); } virtual void Write(const W& msg, void* tag) override { CallOpBuffer buf; buf.AddSendMessage(msg); - call_.PerformOps(&buf, tag); + call_.PerformOps(&buf); } virtual void WritesDone(void* tag) { CallOpBuffer buf; buf.AddClientSendClose(); - call_.PerformOps(&buf, tag); + call_.PerformOps(&buf); } virtual void Finish(Status* status, void* tag) override { CallOpBuffer buf; buf.AddClientRecvStatus(status); - call_.PerformOps(&buf, tag); + call_.PerformOps(&buf); } private: @@ -437,7 +437,7 @@ class ServerAsyncResponseWriter final { virtual void Write(const W& msg, void* tag) override { CallOpBuffer buf; buf.AddSendMessage(msg); - call_->PerformOps(&buf, tag); + call_->PerformOps(&buf); } private: |