aboutsummaryrefslogtreecommitdiffhomepage
path: root/include/grpc++/stream.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/grpc++/stream.h')
-rw-r--r--include/grpc++/stream.h408
1 files changed, 325 insertions, 83 deletions
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index b8982f4d93..c30825a7a5 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -34,7 +34,9 @@
#ifndef __GRPCPP_STREAM_H__
#define __GRPCPP_STREAM_H__
-#include <grpc++/stream_context_interface.h>
+#include <grpc++/call.h>
+#include <grpc++/channel_interface.h>
+#include <grpc++/completion_queue.h>
#include <grpc++/status.h>
#include <grpc/support/log.h>
@@ -45,16 +47,12 @@ class ClientStreamingInterface {
public:
virtual ~ClientStreamingInterface() {}
- // Try to cancel the stream. Wait() still needs to be called to get the final
- // status. Cancelling after the stream has finished has no effects.
- virtual void Cancel() = 0;
-
// Wait until the stream finishes, and return the final status. When the
// client side declares it has no more message to send, either implicitly or
// by calling WritesDone, it needs to make sure there is no more message to
// be received from the server, either implicitly or by getting a false from
// a Read(). Otherwise, this implicitly cancels the stream.
- virtual const Status& Wait() = 0;
+ virtual Status Finish() = 0;
};
// An interface that yields a sequence of R messages.
@@ -82,147 +80,391 @@ class WriterInterface {
};
template <class R>
-class ClientReader : public ClientStreamingInterface,
- public ReaderInterface<R> {
+class ClientReader final : public ClientStreamingInterface,
+ public ReaderInterface<R> {
public:
// Blocking create a stream and write the first request out.
- explicit ClientReader(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
- context_->Write(context_->request(), true);
+ ClientReader(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request)
+ : call_(channel->CreateCall(method, context, &cq_)) {
+ CallOpBuffer buf;
+ buf.AddSendMessage(request);
+ buf.AddClientSendClose();
+ call_.PerformOps(&buf);
+ cq_.Pluck(&buf);
}
- ~ClientReader() { delete context_; }
+ virtual bool Read(R *msg) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
+ }
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ virtual Status Finish() override {
+ CallOpBuffer buf;
+ Status status;
+ buf.AddClientRecvStatus(&status);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
+ return status;
+ }
- virtual void Cancel() { context_->Cancel(); }
+ private:
+ CompletionQueue cq_;
+ Call call_;
+};
- virtual const Status& Wait() { return context_->Wait(); }
+template <class W>
+class ClientWriter final : public ClientStreamingInterface,
+ public WriterInterface<W> {
+ public:
+ // Blocking create a stream.
+ ClientWriter(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ google::protobuf::Message *response)
+ : response_(response),
+ call_(channel->CreateCall(method, context, &cq_)) {}
+
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
+ }
+
+ virtual bool WritesDone() {
+ CallOpBuffer buf;
+ buf.AddClientSendClose();
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
+ }
+
+ // Read the final response and wait for the final status.
+ virtual Status Finish() override {
+ CallOpBuffer buf;
+ Status status;
+ buf.AddRecvMessage(response_);
+ buf.AddClientRecvStatus(&status);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
+ return status;
+ }
private:
- StreamContextInterface* const context_;
+ google::protobuf::Message *const response_;
+ CompletionQueue cq_;
+ Call call_;
};
-template <class W>
-class ClientWriter : public ClientStreamingInterface,
- public WriterInterface<W> {
+// Client-side interface for bi-directional streaming.
+template <class W, class R>
+class ClientReaderWriter final : public ClientStreamingInterface,
+ public WriterInterface<W>,
+ public ReaderInterface<R> {
public:
// Blocking create a stream.
- explicit ClientWriter(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(false);
+ ClientReaderWriter(ChannelInterface *channel,
+ const RpcMethod &method, ClientContext *context)
+ : call_(channel->CreateCall(method, context, &cq_)) {}
+
+ virtual bool Read(R *msg) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
}
- ~ClientWriter() { delete context_; }
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
+ }
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual bool WritesDone() {
+ CallOpBuffer buf;
+ buf.AddClientSendClose();
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
}
- virtual void WritesDone() { context_->Write(nullptr, true); }
+ virtual Status Finish() override {
+ CallOpBuffer buf;
+ Status status;
+ buf.AddClientRecvStatus(&status);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
+ return status;
+ }
- virtual void Cancel() { context_->Cancel(); }
+ private:
+ CompletionQueue cq_;
+ Call call_;
+};
- // Read the final response and wait for the final status.
- virtual const Status& Wait() {
- bool success = context_->Read(context_->response());
- if (!success) {
- Cancel();
- } else {
- success = context_->Read(nullptr);
- if (success) {
- Cancel();
- }
- }
- return context_->Wait();
+template <class R>
+class ServerReader final : public ReaderInterface<R> {
+ public:
+ explicit ServerReader(Call* call) : call_(call) {}
+
+ virtual bool Read(R* msg) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
}
private:
- StreamContextInterface* const context_;
+ Call* call_;
};
-// Client-side interface for bi-directional streaming.
+template <class W>
+class ServerWriter final : public WriterInterface<W> {
+ public:
+ explicit ServerWriter(Call* call) : call_(call) {}
+
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
+ }
+
+ private:
+ Call* call_;
+};
+
+// Server-side interface for bi-directional streaming.
template <class W, class R>
-class ClientReaderWriter : public ClientStreamingInterface,
- public WriterInterface<W>,
+class ServerReaderWriter final : public WriterInterface<W>,
public ReaderInterface<R> {
public:
- // Blocking create a stream.
- explicit ClientReaderWriter(StreamContextInterface* context)
- : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(false);
+ explicit ServerReaderWriter(Call* call) : call_(call) {}
+
+ virtual bool Read(R* msg) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
}
- ~ClientReaderWriter() { delete context_; }
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
+ }
+
+ private:
+ CompletionQueue* cq_;
+ Call* call_;
+};
+
+// Async interfaces
+// Common interface for all client side streaming.
+class ClientAsyncStreamingInterface {
+ public:
+ virtual ~ClientAsyncStreamingInterface() {}
+
+ virtual void Finish(Status* status, void* tag) = 0;
+};
+
+// An interface that yields a sequence of R messages.
+template <class R>
+class AsyncReaderInterface {
+ public:
+ virtual ~AsyncReaderInterface() {}
+
+ virtual void Read(R* msg, void* tag) = 0;
+};
- virtual bool Read(R* msg) { return context_->Read(msg); }
+// An interface that can be fed a sequence of W messages.
+template <class W>
+class AsyncWriterInterface {
+ public:
+ virtual ~AsyncWriterInterface() {}
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual void Write(const W& msg, void* tag) = 0;
+};
+
+template <class R>
+class ClientAsyncReader final : public ClientAsyncStreamingInterface,
+ public AsyncReaderInterface<R> {
+ public:
+ // Blocking create a stream and write the first request out.
+ ClientAsyncReader(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request, void* tag)
+ : call_(channel->CreateCall(method, context, &cq_)) {
+ init_buf_.Reset(tag);
+ init_buf_.AddSendMessage(request);
+ init_buf_.AddClientSendClose();
+ call_.PerformOps(&init_buf_);
}
- virtual void WritesDone() { context_->Write(nullptr, true); }
+ virtual void Read(R *msg, void* tag) override {
+ read_buf_.Reset(tag);
+ read_buf_.AddRecvMessage(msg);
+ call_.PerformOps(&read_buf_);
+ }
- virtual void Cancel() { context_->Cancel(); }
+ virtual void Finish(Status* status, void* tag) override {
+ finish_buf_.Reset(tag);
+ finish_buf_.AddClientRecvStatus(status);
+ call_.PerformOps(&finish_buf_);
+ }
- virtual const Status& Wait() { return context_->Wait(); }
+ private:
+ CompletionQueue cq_;
+ Call call_;
+ CallOpBuffer init_buf_;
+ CallOpBuffer read_buf_;
+ CallOpBuffer finish_buf_;
+};
+
+template <class W>
+class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
+ public WriterInterface<W> {
+ public:
+ // Blocking create a stream.
+ ClientAsyncWriter(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ google::protobuf::Message *response)
+ : response_(response),
+ call_(channel->CreateCall(method, context, &cq_)) {}
+
+ virtual void Write(const W& msg, void* tag) override {
+ write_buf_.Reset(tag);
+ write_buf_.AddSendMessage(msg);
+ call_.PerformOps(&write_buf_);
+ }
+
+ virtual void WritesDone(void* tag) override {
+ writes_done_buf_.Reset(tag);
+ writes_done_buf_.AddClientSendClose();
+ call_.PerformOps(&writes_done_buf_);
+ }
+
+ virtual void Finish(Status* status, void* tag) override {
+ finish_buf_.Reset(tag);
+ finish_buf_.AddRecvMessage(response_);
+ finish_buf_.AddClientRecvStatus(status);
+ call_.PerformOps(&finish_buf_);
+ }
private:
- StreamContextInterface* const context_;
+ google::protobuf::Message *const response_;
+ CompletionQueue cq_;
+ Call call_;
+ CallOpBuffer write_buf_;
+ CallOpBuffer writes_done_buf_;
+ CallOpBuffer finish_buf_;
};
-template <class R>
-class ServerReader : public ReaderInterface<R> {
+// Client-side interface for bi-directional streaming.
+template <class W, class R>
+class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
+ public AsyncWriterInterface<W>,
+ public AsyncReaderInterface<R> {
public:
- explicit ServerReader(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
+ ClientAsyncReaderWriter(ChannelInterface *channel,
+ const RpcMethod &method, ClientContext *context)
+ : call_(channel->CreateCall(method, context, &cq_)) {}
+
+ virtual void Read(R *msg, void* tag) override {
+ read_buf_.Reset(tag);
+ read_buf_.AddRecvMessage(msg);
+ call_.PerformOps(&read_buf_);
+ }
+
+ virtual void Write(const W& msg, void* tag) override {
+ write_buf_.Reset(tag);
+ write_buf_.AddSendMessage(msg);
+ call_.PerformOps(&write_buf_);
}
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ virtual void WritesDone(void* tag) override {
+ writes_done_buf_.Reset(tag);
+ writes_done_buf_.AddClientSendClose();
+ call_.PerformOps(&writes_done_buf_);
+ }
+
+ virtual void Finish(Status* status, void* tag) override {
+ finish_buf_.Reset(tag);
+ finish_buf_.AddClientRecvStatus(status);
+ call_.PerformOps(&finish_buf_);
+ }
private:
- StreamContextInterface* const context_; // not owned
+ CompletionQueue cq_;
+ Call call_;
+ CallOpBuffer read_buf_;
+ CallOpBuffer write_buf_;
+ CallOpBuffer writes_done_buf_;
+ CallOpBuffer finish_buf_;
};
+// TODO(yangg) Move out of stream.h
template <class W>
-class ServerWriter : public WriterInterface<W> {
+class ServerAsyncResponseWriter final {
public:
- explicit ServerWriter(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
- context_->Read(context_->request());
+ explicit ServerAsyncResponseWriter(Call* call) : call_(call) {}
+
+ virtual void Write(const W& msg, void* tag) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_->PerformOps(&buf);
}
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ private:
+ Call* call_;
+};
+
+template <class R>
+class ServerAsyncReader : public AsyncReaderInterface<R> {
+ public:
+ explicit ServerAsyncReader(Call* call) : call_(call) {}
+
+ virtual void Read(R* msg, void* tag) {
+ // TODO
+ }
+
+ private:
+ Call* call_;
+};
+
+template <class W>
+class ServerAsyncWriter : public AsyncWriterInterface<W> {
+ public:
+ explicit ServerAsyncWriter(Call* call) : call_(call) {}
+
+ virtual void Write(const W& msg, void* tag) {
+ // TODO
}
private:
- StreamContextInterface* const context_; // not owned
+ Call* call_;
};
// Server-side interface for bi-directional streaming.
template <class W, class R>
-class ServerReaderWriter : public WriterInterface<W>,
- public ReaderInterface<R> {
+class ServerAsyncReaderWriter : public AsyncWriterInterface<W>,
+ public AsyncReaderInterface<R> {
public:
- explicit ServerReaderWriter(StreamContextInterface* context)
- : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
- }
+ explicit ServerAsyncReaderWriter(Call* call) : call_(call) {}
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ virtual void Read(R* msg, void* tag) {
+ // TODO
+ }
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual void Write(const W& msg, void* tag) {
+ // TODO
}
private:
- StreamContextInterface* const context_; // not owned
+ Call* call_;
};
} // namespace grpc