aboutsummaryrefslogtreecommitdiffhomepage
path: root/include
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-02-12 14:57:17 -0800
committerGravatar Craig Tiller <ctiller@google.com>2015-02-12 14:57:17 -0800
commitd4ebeeb7fbd61031c9b3db013e07195f31013e89 (patch)
tree51133d03c724fbae1d0cca1266a9f292592b7f5a /include
parent3d6ceb646178ce7a5b0de38c38ba75da448fae39 (diff)
Async server streaming
Diffstat (limited to 'include')
-rw-r--r--include/grpc++/stream.h58
1 files changed, 33 insertions, 25 deletions
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index 2f37cc4075..23387e78ab 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -574,8 +574,8 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
template <class W>
class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
public:
- ServerAsyncResponseWriter(Call* call, ServerContext* ctx)
- : call_(call), ctx_(ctx) {}
+ explicit ServerAsyncResponseWriter(ServerContext* ctx)
+ : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
void SendInitialMetadata(void* tag) {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
@@ -583,7 +583,7 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_->PerformOps(&meta_buf_);
+ call_.PerformOps(&meta_buf_);
}
void Finish(const W& msg, const Status& status, void* tag) {
@@ -599,7 +599,7 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
- call_->PerformOps(&finish_buf_);
+ call_.PerformOps(&finish_buf_);
}
void FinishWithError(const Status& status, void* tag) {
@@ -612,11 +612,13 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
- call_->PerformOps(&finish_buf_);
+ call_.PerformOps(&finish_buf_);
}
private:
- Call* call_;
+ void BindCall(Call *call) override { call_ = *call; }
+
+ Call call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer finish_buf_;
@@ -626,8 +628,8 @@ template <class R>
class ServerAsyncReader : public ServerAsyncStreamingInterface,
public AsyncReaderInterface<R> {
public:
- ServerAsyncReader(Call* call, ServerContext* ctx)
- : call_(call), ctx_(ctx) {}
+ explicit ServerAsyncReader(ServerContext* ctx)
+ : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
void SendInitialMetadata(void* tag) override {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
@@ -635,13 +637,13 @@ class ServerAsyncReader : public ServerAsyncStreamingInterface,
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_->PerformOps(&meta_buf_);
+ call_.PerformOps(&meta_buf_);
}
void Read(R* msg, void* tag) override {
read_buf_.Reset(tag);
read_buf_.AddRecvMessage(msg);
- call_->PerformOps(&read_buf_);
+ call_.PerformOps(&read_buf_);
}
void Finish(const Status& status, void* tag) override {
@@ -653,12 +655,14 @@ class ServerAsyncReader : public ServerAsyncStreamingInterface,
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
- call_->PerformOps(&finish_buf_);
+ call_.PerformOps(&finish_buf_);
}
private:
- Call* call_;
+ void BindCall(Call *call) override { call_ = *call; }
+
+ Call call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer read_buf_;
@@ -669,8 +673,8 @@ template <class W>
class ServerAsyncWriter : public ServerAsyncStreamingInterface,
public AsyncWriterInterface<W> {
public:
- ServerAsyncWriter(Call* call, ServerContext* ctx)
- : call_(call), ctx_(ctx) {}
+ explicit ServerAsyncWriter(ServerContext* ctx)
+ : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
void SendInitialMetadata(void* tag) override {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
@@ -678,7 +682,7 @@ class ServerAsyncWriter : public ServerAsyncStreamingInterface,
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_->PerformOps(&meta_buf_);
+ call_.PerformOps(&meta_buf_);
}
void Write(const W& msg, void* tag) override {
@@ -688,7 +692,7 @@ class ServerAsyncWriter : public ServerAsyncStreamingInterface,
ctx_->sent_initial_metadata_ = true;
}
write_buf_.AddSendMessage(msg);
- call_->PerformOps(&write_buf_);
+ call_.PerformOps(&write_buf_);
}
void Finish(const Status& status, void* tag) override {
@@ -700,11 +704,13 @@ class ServerAsyncWriter : public ServerAsyncStreamingInterface,
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
- call_->PerformOps(&finish_buf_);
+ call_.PerformOps(&finish_buf_);
}
private:
- Call* call_;
+ void BindCall(Call *call) override { call_ = *call; }
+
+ Call call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer write_buf_;
@@ -717,8 +723,8 @@ class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface,
public AsyncWriterInterface<W>,
public AsyncReaderInterface<R> {
public:
- ServerAsyncReaderWriter(Call* call, ServerContext* ctx)
- : call_(call), ctx_(ctx) {}
+ explicit ServerAsyncReaderWriter(ServerContext* ctx)
+ : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
void SendInitialMetadata(void* tag) override {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
@@ -726,13 +732,13 @@ class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface,
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_->PerformOps(&meta_buf_);
+ call_.PerformOps(&meta_buf_);
}
virtual void Read(R* msg, void* tag) override {
read_buf_.Reset(tag);
read_buf_.AddRecvMessage(msg);
- call_->PerformOps(&read_buf_);
+ call_.PerformOps(&read_buf_);
}
virtual void Write(const W& msg, void* tag) override {
@@ -742,7 +748,7 @@ class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface,
ctx_->sent_initial_metadata_ = true;
}
write_buf_.AddSendMessage(msg);
- call_->PerformOps(&write_buf_);
+ call_.PerformOps(&write_buf_);
}
void Finish(const Status& status, void* tag) override {
@@ -754,11 +760,13 @@ class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface,
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
- call_->PerformOps(&finish_buf_);
+ call_.PerformOps(&finish_buf_);
}
private:
- Call* call_;
+ void BindCall(Call *call) override { call_ = *call; }
+
+ Call call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer read_buf_;