aboutsummaryrefslogtreecommitdiffhomepage
path: root/include
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-09 16:12:36 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-09 16:12:36 -0800
commit8de8c646c5c231796e45ecb8e4302f1a3fc4004c (patch)
treedb89c170dc64dd03748bef64457ee7e1246ad1fa /include
parentb470169252c36a64749f6b7c60da36845c7fee1d (diff)
parent75ec2b191cbd94ff0eb3f1247d66e3991aa97000 (diff)
Merge pull request #3 from yang-g/c++api
more implementation and all async signatures
Diffstat (limited to 'include')
-rw-r--r--include/grpc++/stream.h236
1 files changed, 184 insertions, 52 deletions
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index 30af678c69..34604191e5 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -177,6 +177,7 @@ class ClientWriter final : public ClientStreamingInterface,
virtual Status Finish() override {
CallOpBuffer buf;
Status status;
+ buf.AddRecvMessage(response_);
buf.AddClientRecvStatus(&status);
call_.PerformOps(&buf, (void *)4);
GPR_ASSERT(cq_.Pluck((void *)4));
@@ -252,110 +253,241 @@ class ServerReader final : public ReaderInterface<R> {
};
template <class W>
-class ServerWriter : public WriterInterface<W> {
+class ServerWriter final : public WriterInterface<W> {
public:
- explicit ServerWriter(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
- context_->Read(context_->request());
- }
+ explicit ServerWriter(Call* call) : call_(call) {}
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_->PerformOps(&buf, (void *)2);
+ return call_->cq()->Pluck((void *)2);
}
private:
- StreamContextInterface* const context_; // not owned
+ Call* call_;
};
// Server-side interface for bi-directional streaming.
template <class W, class R>
-class ServerReaderWriter : public WriterInterface<W>,
+class ServerReaderWriter final : public WriterInterface<W>,
public ReaderInterface<R> {
public:
- explicit ServerReaderWriter(StreamContextInterface* context)
- : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
+ explicit ServerReaderWriter(Call* call) : call_(call) {}
+
+ virtual bool Read(R* msg) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_->PerformOps(&buf, (void *)2);
+ return call_->cq()->Pluck((void *)2);
+ }
+
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_->PerformOps(&buf, (void *)3);
+ return call_->cq()->Pluck((void *)3);
}
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ private:
+ CompletionQueue* cq_;
+ Call* call_;
+};
+
+// Async interfaces
+// Common interface for all client side streaming.
+class ClientAsyncStreamingInterface {
+ public:
+ virtual ~ClientAsyncStreamingInterface() {}
+
+ virtual void Finish(Status* status, void* tag) = 0;
+};
+
+// An interface that yields a sequence of R messages.
+template <class R>
+class AsyncReaderInterface {
+ public:
+ virtual ~AsyncReaderInterface() {}
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual void Read(R* msg, void* tag) = 0;
+};
+
+// An interface that can be fed a sequence of W messages.
+template <class W>
+class AsyncWriterInterface {
+ public:
+ virtual ~Async WriterInterface() {}
+
+ virtual void Write(const W& msg, void* tag) = 0;
+};
+
+template <class R>
+class ClientAsyncReader final : public ClientAsyncStreamingInterface,
+ public AsyncReaderInterface<R> {
+ public:
+ // Blocking create a stream and write the first request out.
+ ClientAsyncReader(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request, void* tag)
+ : call_(channel->CreateCall(method, context, &cq_)) {
+ CallOpBuffer buf;
+ buf.AddSendMessage(request);
+ buf.AddClientSendClose();
+ call_.PerformOps(&buf, tag);
+ }
+
+ virtual void Read(R *msg, void* tag) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_.PerformOps(&buf, tag);
+ }
+
+ virtual void Finish(Status* status, void* tag) override {
+ CallOpBuffer buf;
+ buf.AddClientRecvStatus(status);
+ call_.PerformOps(&buf, tag);
}
private:
- StreamContextInterface* const context_; // not owned
+ CompletionQueue cq_;
+ Call call_;
};
template <class W>
-class ServerAsyncResponseWriter {
+class ClientWriter final : public ClientAsyncStreamingInterface,
+ public WriterInterface<W> {
public:
- explicit ServerAsyncResponseWriter(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
- context_->Read(context_->request());
+ // Blocking create a stream.
+ ClientAsyncWriter(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ google::protobuf::Message *response)
+ : response_(response),
+ call_(channel->CreateCall(method, context, &cq_)) {}
+
+ virtual void Write(const W& msg, void* tag) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_.PerformOps(&buf, tag);
}
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual void WritesDone(void* tag) {
+ CallOpBuffer buf;
+ buf.AddClientSendClose();
+ call_.PerformOps(&buf, tag);
+ }
+
+ virtual void Finish(Status* status, void* tag) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(response_);
+ buf.AddClientRecvStatus(status);
+ call_.PerformOps(&buf, tag);
}
private:
- StreamContextInterface* const context_; // not owned
+ google::protobuf::Message *const response_;
+ CompletionQueue cq_;
+ Call call_;
};
-template <class R>
-class ServerAsyncReader : public ReaderInterface<R> {
+// Client-side interface for bi-directional streaming.
+template <class W, class R>
+class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
+ public AsyncWriterInterface<W>,
+ public AsyncReaderInterface<R> {
public:
- explicit ServerAsyncReader(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
+ ClientAsyncReaderWriter(ChannelInterface *channel,
+ const RpcMethod &method, ClientContext *context)
+ : call_(channel->CreateCall(method, context, &cq_)) {}
+
+ virtual void Read(R *msg, void* tag) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_.PerformOps(&buf, tag);
+ }
+
+ virtual void Write(const W& msg, void* tag) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_.PerformOps(&buf, tag);
+ }
+
+ virtual bool WritesDone(void* tag) {
+ CallOpBuffer buf;
+ buf.AddClientSendClose();
+ call_.PerformOps(&buf, tag);
}
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ virtual void Finish(Status* status, void* tag) override {
+ CallOpBuffer buf;
+ Status status;
+ buf.AddClientRecvStatus(status);
+ call_.PerformOps(&buf, tag);
+ }
private:
- StreamContextInterface* const context_; // not owned
+ CompletionQueue cq_;
+ Call call_;
};
+// TODO(yangg) Move out of stream.h
template <class W>
-class ServerAsyncWriter : public WriterInterface<W> {
+class ServerAsyncResponseWriter final {
public:
- explicit ServerAsyncWriter(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
- context_->Read(context_->request());
+ explicit ServerAsyncResponseWriter(Call* call) : call_(call) {}
+
+ virtual void Write(const W& msg, void* tag) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_->PerformOps(&buf, tag);
}
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ private:
+ Call* call_;
+};
+
+template <class R>
+class ServerAsyncReader : public AsyncReaderInterface<R> {
+ public:
+ explicit ServerAsyncReader(Call* call) : call_(call) {}
+
+ virtual void Read(R* msg, void* tag) {
+ // TODO
}
private:
- StreamContextInterface* const context_; // not owned
+ Call* call_;
+};
+
+template <class W>
+class ServerAsyncWriter : public AsyncWriterInterface<W> {
+ public:
+ explicit ServerAsyncWriter(Call* call) : call_(call) {}
+
+ virtual void Write(const W& msg, void* tag) {
+ // TODO
+ }
+
+ private:
+ Call* call_;
};
// Server-side interface for bi-directional streaming.
template <class W, class R>
-class ServerAsyncReaderWriter : public WriterInterface<W>,
- public ReaderInterface<R> {
+class ServerAsyncReaderWriter : public AsyncWriterInterface<W>,
+ public AsyncReaderInterface<R> {
public:
- explicit ServerAsyncReaderWriter(StreamContextInterface* context)
- : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
- }
+ explicit ServerAsyncReaderWriter(Call* call) : call_(call) {}
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ virtual void Read(R* msg, void* tag) {
+ // TODO
+ }
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual void Write(const W& msg, void* tag) {
+ // TODO
}
private:
- StreamContextInterface* const context_; // not owned
+ Call* call_;
};
} // namespace grpc