aboutsummaryrefslogtreecommitdiffhomepage
path: root/include/grpc++/stream.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/grpc++/stream.h')
-rw-r--r--include/grpc++/stream.h719
1 files changed, 638 insertions, 81 deletions
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index b8982f4d93..be5b29589f 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -34,7 +34,12 @@
#ifndef __GRPCPP_STREAM_H__
#define __GRPCPP_STREAM_H__
-#include <grpc++/stream_context_interface.h>
+#include <grpc++/channel_interface.h>
+#include <grpc++/client_context.h>
+#include <grpc++/completion_queue.h>
+#include <grpc++/server_context.h>
+#include <grpc++/impl/call.h>
+#include <grpc++/impl/service_type.h>
#include <grpc++/status.h>
#include <grpc/support/log.h>
@@ -45,16 +50,12 @@ class ClientStreamingInterface {
public:
virtual ~ClientStreamingInterface() {}
- // Try to cancel the stream. Wait() still needs to be called to get the final
- // status. Cancelling after the stream has finished has no effects.
- virtual void Cancel() = 0;
-
// Wait until the stream finishes, and return the final status. When the
// client side declares it has no more message to send, either implicitly or
// by calling WritesDone, it needs to make sure there is no more message to
// be received from the server, either implicitly or by getting a false from
// a Read(). Otherwise, this implicitly cancels the stream.
- virtual const Status& Wait() = 0;
+ virtual Status Finish() = 0;
};
// An interface that yields a sequence of R messages.
@@ -82,147 +83,703 @@ class WriterInterface {
};
template <class R>
-class ClientReader : public ClientStreamingInterface,
- public ReaderInterface<R> {
+class ClientReader final : public ClientStreamingInterface,
+ public ReaderInterface<R> {
public:
// Blocking create a stream and write the first request out.
- explicit ClientReader(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
- context_->Write(context_->request(), true);
+ ClientReader(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context, const google::protobuf::Message& request)
+ : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
+ CallOpBuffer buf;
+ buf.AddSendInitialMetadata(&context->send_initial_metadata_);
+ buf.AddSendMessage(request);
+ buf.AddClientSendClose();
+ call_.PerformOps(&buf);
+ cq_.Pluck(&buf);
}
- ~ClientReader() { delete context_; }
-
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ // Blocking wait for initial metadata from server. The received metadata
+ // can only be accessed after this call returns. Should only be called before
+ // the first read. Calling this method is optional, and if it is not called
+ // the metadata will be available in ClientContext after the first read.
+ void WaitForInitialMetadata() {
+ GPR_ASSERT(!context_->initial_metadata_received_);
+
+ CallOpBuffer buf;
+ buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
+ context_->initial_metadata_received_ = true;
+ }
- virtual void Cancel() { context_->Cancel(); }
+ virtual bool Read(R* msg) override {
+ CallOpBuffer buf;
+ if (!context_->initial_metadata_received_) {
+ buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
+ buf.AddRecvMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf) && buf.got_message;
+ }
- virtual const Status& Wait() { return context_->Wait(); }
+ virtual Status Finish() override {
+ CallOpBuffer buf;
+ Status status;
+ buf.AddClientRecvStatus(&context_->trailing_metadata_, &status);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
+ return status;
+ }
private:
- StreamContextInterface* const context_;
+ ClientContext* context_;
+ CompletionQueue cq_;
+ Call call_;
};
template <class W>
-class ClientWriter : public ClientStreamingInterface,
- public WriterInterface<W> {
+class ClientWriter final : public ClientStreamingInterface,
+ public WriterInterface<W> {
public:
// Blocking create a stream.
- explicit ClientWriter(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(false);
+ ClientWriter(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context, google::protobuf::Message* response)
+ : context_(context),
+ response_(response),
+ call_(channel->CreateCall(method, context, &cq_)) {
+ CallOpBuffer buf;
+ buf.AddSendInitialMetadata(&context->send_initial_metadata_);
+ call_.PerformOps(&buf);
+ cq_.Pluck(&buf);
}
- ~ClientWriter() { delete context_; }
-
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
}
- virtual void WritesDone() { context_->Write(nullptr, true); }
-
- virtual void Cancel() { context_->Cancel(); }
+ virtual bool WritesDone() {
+ CallOpBuffer buf;
+ buf.AddClientSendClose();
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
+ }
// Read the final response and wait for the final status.
- virtual const Status& Wait() {
- bool success = context_->Read(context_->response());
- if (!success) {
- Cancel();
- } else {
- success = context_->Read(nullptr);
- if (success) {
- Cancel();
- }
- }
- return context_->Wait();
+ virtual Status Finish() override {
+ CallOpBuffer buf;
+ Status status;
+ buf.AddRecvMessage(response_);
+ buf.AddClientRecvStatus(&context_->trailing_metadata_, &status);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf) && buf.got_message);
+ return status;
}
private:
- StreamContextInterface* const context_;
+ ClientContext* context_;
+ google::protobuf::Message* const response_;
+ CompletionQueue cq_;
+ Call call_;
};
// Client-side interface for bi-directional streaming.
template <class W, class R>
-class ClientReaderWriter : public ClientStreamingInterface,
- public WriterInterface<W>,
- public ReaderInterface<R> {
+class ClientReaderWriter final : public ClientStreamingInterface,
+ public WriterInterface<W>,
+ public ReaderInterface<R> {
public:
// Blocking create a stream.
- explicit ClientReaderWriter(StreamContextInterface* context)
- : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(false);
+ ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context)
+ : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
+ CallOpBuffer buf;
+ buf.AddSendInitialMetadata(&context->send_initial_metadata_);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
}
- ~ClientReaderWriter() { delete context_; }
+ // Blocking wait for initial metadata from server. The received metadata
+ // can only be accessed after this call returns. Should only be called before
+ // the first read. Calling this method is optional, and if it is not called
+ // the metadata will be available in ClientContext after the first read.
+ void WaitForInitialMetadata() {
+ GPR_ASSERT(!context_->initial_metadata_received_);
+
+ CallOpBuffer buf;
+ buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
+ context_->initial_metadata_received_ = true;
+ }
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ virtual bool Read(R* msg) override {
+ CallOpBuffer buf;
+ if (!context_->initial_metadata_received_) {
+ buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
+ buf.AddRecvMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf) && buf.got_message;
+ }
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
}
- virtual void WritesDone() { context_->Write(nullptr, true); }
+ virtual bool WritesDone() {
+ CallOpBuffer buf;
+ buf.AddClientSendClose();
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
+ }
- virtual void Cancel() { context_->Cancel(); }
+ virtual Status Finish() override {
+ CallOpBuffer buf;
+ Status status;
+ buf.AddClientRecvStatus(&context_->trailing_metadata_, &status);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
+ return status;
+ }
- virtual const Status& Wait() { return context_->Wait(); }
+ private:
+ ClientContext* context_;
+ CompletionQueue cq_;
+ Call call_;
+};
+
+template <class R>
+class ServerReader final : public ReaderInterface<R> {
+ public:
+ ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
+
+ void SendInitialMetadata() {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ CallOpBuffer buf;
+ buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_->PerformOps(&buf);
+ call_->cq()->Pluck(&buf);
+ }
+
+ virtual bool Read(R* msg) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf) && buf.got_message;
+ }
private:
- StreamContextInterface* const context_;
+ Call* const call_;
+ ServerContext* const ctx_;
+};
+
+template <class W>
+class ServerWriter final : public WriterInterface<W> {
+ public:
+ ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
+
+ void SendInitialMetadata() {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ CallOpBuffer buf;
+ buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_->PerformOps(&buf);
+ call_->cq()->Pluck(&buf);
+ }
+
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ if (!ctx_->sent_initial_metadata_) {
+ buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ buf.AddSendMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
+ }
+
+ private:
+ Call* const call_;
+ ServerContext* const ctx_;
+};
+
+// Server-side interface for bi-directional streaming.
+template <class W, class R>
+class ServerReaderWriter final : public WriterInterface<W>,
+ public ReaderInterface<R> {
+ public:
+ ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
+
+ void SendInitialMetadata() {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ CallOpBuffer buf;
+ buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_->PerformOps(&buf);
+ call_->cq()->Pluck(&buf);
+ }
+
+ virtual bool Read(R* msg) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf) && buf.got_message;
+ }
+
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ if (!ctx_->sent_initial_metadata_) {
+ buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ buf.AddSendMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
+ }
+
+ private:
+ Call* const call_;
+ ServerContext* const ctx_;
+};
+
+// Async interfaces
+// Common interface for all client side streaming.
+class ClientAsyncStreamingInterface {
+ public:
+ virtual ~ClientAsyncStreamingInterface() {}
+
+ virtual void ReadInitialMetadata(void* tag) = 0;
+
+ virtual void Finish(Status* status, void* tag) = 0;
+};
+
+// An interface that yields a sequence of R messages.
+template <class R>
+class AsyncReaderInterface {
+ public:
+ virtual ~AsyncReaderInterface() {}
+
+ virtual void Read(R* msg, void* tag) = 0;
+};
+
+// An interface that can be fed a sequence of W messages.
+template <class W>
+class AsyncWriterInterface {
+ public:
+ virtual ~AsyncWriterInterface() {}
+
+ virtual void Write(const W& msg, void* tag) = 0;
};
template <class R>
-class ServerReader : public ReaderInterface<R> {
+class ClientAsyncReader final : public ClientAsyncStreamingInterface,
+ public AsyncReaderInterface<R> {
public:
- explicit ServerReader(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
+ // Create a stream and write the first request out.
+ ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
+ const RpcMethod& method, ClientContext* context,
+ const google::protobuf::Message& request, void* tag)
+ : context_(context), call_(channel->CreateCall(method, context, cq)) {
+ init_buf_.Reset(tag);
+ init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
+ init_buf_.AddSendMessage(request);
+ init_buf_.AddClientSendClose();
+ call_.PerformOps(&init_buf_);
}
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ void ReadInitialMetadata(void* tag) override {
+ GPR_ASSERT(!context_->initial_metadata_received_);
+
+ meta_buf_.Reset(tag);
+ meta_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ call_.PerformOps(&meta_buf_);
+ context_->initial_metadata_received_ = true;
+ }
+
+ void Read(R* msg, void* tag) override {
+ read_buf_.Reset(tag);
+ if (!context_->initial_metadata_received_) {
+ read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
+ read_buf_.AddRecvMessage(msg);
+ call_.PerformOps(&read_buf_);
+ }
+
+ void Finish(Status* status, void* tag) override {
+ finish_buf_.Reset(tag);
+ if (!context_->initial_metadata_received_) {
+ finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
+ finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
+ }
private:
- StreamContextInterface* const context_; // not owned
+ ClientContext* context_ = nullptr;
+ Call call_;
+ CallOpBuffer init_buf_;
+ CallOpBuffer meta_buf_;
+ CallOpBuffer read_buf_;
+ CallOpBuffer finish_buf_;
};
template <class W>
-class ServerWriter : public WriterInterface<W> {
+class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
+ public AsyncWriterInterface<W> {
public:
- explicit ServerWriter(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
- context_->Read(context_->request());
+ ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
+ const RpcMethod& method, ClientContext* context,
+ google::protobuf::Message* response, void* tag)
+ : context_(context),
+ response_(response),
+ call_(channel->CreateCall(method, context, cq)) {
+ init_buf_.Reset(tag);
+ init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
+ call_.PerformOps(&init_buf_);
+ }
+
+ void ReadInitialMetadata(void* tag) override {
+ GPR_ASSERT(!context_->initial_metadata_received_);
+
+ meta_buf_.Reset(tag);
+ meta_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ call_.PerformOps(&meta_buf_);
+ context_->initial_metadata_received_ = true;
+ }
+
+ void Write(const W& msg, void* tag) override {
+ write_buf_.Reset(tag);
+ write_buf_.AddSendMessage(msg);
+ call_.PerformOps(&write_buf_);
}
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ void WritesDone(void* tag) {
+ writes_done_buf_.Reset(tag);
+ writes_done_buf_.AddClientSendClose();
+ call_.PerformOps(&writes_done_buf_);
+ }
+
+ void Finish(Status* status, void* tag) override {
+ finish_buf_.Reset(tag);
+ if (!context_->initial_metadata_received_) {
+ finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
+ finish_buf_.AddRecvMessage(response_);
+ finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
+ }
+
+ private:
+ ClientContext* context_ = nullptr;
+ google::protobuf::Message* const response_;
+ Call call_;
+ CallOpBuffer init_buf_;
+ CallOpBuffer meta_buf_;
+ CallOpBuffer write_buf_;
+ CallOpBuffer writes_done_buf_;
+ CallOpBuffer finish_buf_;
+};
+
+// Client-side interface for bi-directional streaming.
+template <class W, class R>
+class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
+ public AsyncWriterInterface<W>,
+ public AsyncReaderInterface<R> {
+ public:
+ ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq,
+ const RpcMethod& method, ClientContext* context,
+ void* tag)
+ : context_(context), call_(channel->CreateCall(method, context, cq)) {
+ init_buf_.Reset(tag);
+ init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
+ call_.PerformOps(&init_buf_);
+ }
+
+ void ReadInitialMetadata(void* tag) override {
+ GPR_ASSERT(!context_->initial_metadata_received_);
+
+ meta_buf_.Reset(tag);
+ meta_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ call_.PerformOps(&meta_buf_);
+ context_->initial_metadata_received_ = true;
+ }
+
+ void Read(R* msg, void* tag) override {
+ read_buf_.Reset(tag);
+ if (!context_->initial_metadata_received_) {
+ read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
+ read_buf_.AddRecvMessage(msg);
+ call_.PerformOps(&read_buf_);
+ }
+
+ void Write(const W& msg, void* tag) override {
+ write_buf_.Reset(tag);
+ write_buf_.AddSendMessage(msg);
+ call_.PerformOps(&write_buf_);
+ }
+
+ void WritesDone(void* tag) {
+ writes_done_buf_.Reset(tag);
+ writes_done_buf_.AddClientSendClose();
+ call_.PerformOps(&writes_done_buf_);
+ }
+
+ void Finish(Status* status, void* tag) override {
+ finish_buf_.Reset(tag);
+ if (!context_->initial_metadata_received_) {
+ finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
+ finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
+ }
+
+ private:
+ ClientContext* context_ = nullptr;
+ Call call_;
+ CallOpBuffer init_buf_;
+ CallOpBuffer meta_buf_;
+ CallOpBuffer read_buf_;
+ CallOpBuffer write_buf_;
+ CallOpBuffer writes_done_buf_;
+ CallOpBuffer finish_buf_;
+};
+
+// TODO(yangg) Move out of stream.h
+template <class W>
+class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
+ public:
+ explicit ServerAsyncResponseWriter(ServerContext* ctx)
+ : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
+
+ void SendInitialMetadata(void* tag) {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ meta_buf_.Reset(tag);
+ meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_.PerformOps(&meta_buf_);
+ }
+
+ void Finish(const W& msg, const Status& status, void* tag) {
+ finish_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ // The response is dropped if the status is not OK.
+ if (status.IsOk()) {
+ finish_buf_.AddSendMessage(msg);
+ }
+ bool cancelled = false;
+ finish_buf_.AddServerRecvClose(&cancelled);
+ finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
+ }
+
+ void FinishWithError(const Status& status, void* tag) {
+ GPR_ASSERT(!status.IsOk());
+ finish_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ bool cancelled = false;
+ finish_buf_.AddServerRecvClose(&cancelled);
+ finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
+ }
+
+ private:
+ void BindCall(Call* call) override { call_ = *call; }
+
+ Call call_;
+ ServerContext* ctx_;
+ CallOpBuffer meta_buf_;
+ CallOpBuffer finish_buf_;
+};
+
+template <class W, class R>
+class ServerAsyncReader : public ServerAsyncStreamingInterface,
+ public AsyncReaderInterface<R> {
+ public:
+ explicit ServerAsyncReader(ServerContext* ctx)
+ : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
+
+ void SendInitialMetadata(void* tag) override {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ meta_buf_.Reset(tag);
+ meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_.PerformOps(&meta_buf_);
+ }
+
+ void Read(R* msg, void* tag) override {
+ read_buf_.Reset(tag);
+ read_buf_.AddRecvMessage(msg);
+ call_.PerformOps(&read_buf_);
+ }
+
+ void Finish(const W& msg, const Status& status, void* tag) {
+ finish_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ // The response is dropped if the status is not OK.
+ if (status.IsOk()) {
+ finish_buf_.AddSendMessage(msg);
+ }
+ bool cancelled = false;
+ finish_buf_.AddServerRecvClose(&cancelled);
+ finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
+ }
+
+ void FinishWithError(const Status& status, void* tag) {
+ GPR_ASSERT(!status.IsOk());
+ finish_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ bool cancelled = false;
+ finish_buf_.AddServerRecvClose(&cancelled);
+ finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
+ }
+
+ private:
+ void BindCall(Call* call) override { call_ = *call; }
+
+ Call call_;
+ ServerContext* ctx_;
+ CallOpBuffer meta_buf_;
+ CallOpBuffer read_buf_;
+ CallOpBuffer finish_buf_;
+};
+
+template <class W>
+class ServerAsyncWriter : public ServerAsyncStreamingInterface,
+ public AsyncWriterInterface<W> {
+ public:
+ explicit ServerAsyncWriter(ServerContext* ctx)
+ : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
+
+ void SendInitialMetadata(void* tag) override {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ meta_buf_.Reset(tag);
+ meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_.PerformOps(&meta_buf_);
+ }
+
+ void Write(const W& msg, void* tag) override {
+ write_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ write_buf_.AddSendMessage(msg);
+ call_.PerformOps(&write_buf_);
+ }
+
+ void Finish(const Status& status, void* tag) {
+ finish_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ bool cancelled = false;
+ finish_buf_.AddServerRecvClose(&cancelled);
+ finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
}
private:
- StreamContextInterface* const context_; // not owned
+ void BindCall(Call* call) override { call_ = *call; }
+
+ Call call_;
+ ServerContext* ctx_;
+ CallOpBuffer meta_buf_;
+ CallOpBuffer write_buf_;
+ CallOpBuffer finish_buf_;
};
// Server-side interface for bi-directional streaming.
template <class W, class R>
-class ServerReaderWriter : public WriterInterface<W>,
- public ReaderInterface<R> {
+class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface,
+ public AsyncWriterInterface<W>,
+ public AsyncReaderInterface<R> {
public:
- explicit ServerReaderWriter(StreamContextInterface* context)
- : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
+ explicit ServerAsyncReaderWriter(ServerContext* ctx)
+ : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
+
+ void SendInitialMetadata(void* tag) override {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ meta_buf_.Reset(tag);
+ meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_.PerformOps(&meta_buf_);
}
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ virtual void Read(R* msg, void* tag) override {
+ read_buf_.Reset(tag);
+ read_buf_.AddRecvMessage(msg);
+ call_.PerformOps(&read_buf_);
+ }
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual void Write(const W& msg, void* tag) override {
+ write_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ write_buf_.AddSendMessage(msg);
+ call_.PerformOps(&write_buf_);
+ }
+
+ void Finish(const Status& status, void* tag) {
+ finish_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ bool cancelled = false;
+ finish_buf_.AddServerRecvClose(&cancelled);
+ finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
}
private:
- StreamContextInterface* const context_; // not owned
+ void BindCall(Call* call) override { call_ = *call; }
+
+ Call call_;
+ ServerContext* ctx_;
+ CallOpBuffer meta_buf_;
+ CallOpBuffer read_buf_;
+ CallOpBuffer write_buf_;
+ CallOpBuffer finish_buf_;
};
} // namespace grpc