From e1300deb87b5fca2b4361a753d0bd4d19b078ea4 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 27 Jan 2016 18:41:26 -0800 Subject: After GrpcLibrary refactoring. Compiles and passes. WIP still --- include/grpc++/support/sync_stream.h | 62 ++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 34 deletions(-) (limited to 'include/grpc++/support') diff --git a/include/grpc++/support/sync_stream.h b/include/grpc++/support/sync_stream.h index 77066ada62..3557ba5156 100644 --- a/include/grpc++/support/sync_stream.h +++ b/include/grpc++/support/sync_stream.h @@ -34,18 +34,18 @@ #ifndef GRPCXX_SUPPORT_SYNC_STREAM_H #define GRPCXX_SUPPORT_SYNC_STREAM_H -#include +#include +#include +#include +#include #include -#include -#include -#include -#include -#include +#include +#include +#include +#include namespace grpc { -class CompletionQueue; - /// Common interface for all synchronous client side streaming. class ClientStreamingInterface { public: @@ -121,9 +121,7 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface { template ClientReader(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const W& request) - : context_(context), - cq_(new CompletionQueue), - call_(channel->CreateCall(method, context, cq_.get())) { + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { CallOpSet ops; ops.SendInitialMetadata(context->send_initial_metadata_); @@ -131,7 +129,7 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface { GPR_ASSERT(ops.SendMessage(request).ok()); ops.ClientSendClose(); call_.PerformOps(&ops); - cq_->Pluck(&ops); + cq_.Pluck(&ops); } void WaitForInitialMetadata() GRPC_OVERRIDE { @@ -140,7 +138,7 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface { CallOpSet ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); - cq_->Pluck(&ops); /// status ignored + cq_.Pluck(&ops); /// status ignored } bool Read(R* msg) GRPC_OVERRIDE { @@ -150,7 +148,7 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface { } ops.RecvMessage(msg); call_.PerformOps(&ops); - return cq_->Pluck(&ops) && ops.got_message; + return cq_.Pluck(&ops) && ops.got_message; } Status Finish() GRPC_OVERRIDE { @@ -158,13 +156,13 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface { Status status; ops.ClientRecvStatus(context_, &status); call_.PerformOps(&ops); - GPR_ASSERT(cq_->Pluck(&ops)); + GPR_ASSERT(cq_.Pluck(&ops)); return status; } private: ClientContext* context_; - std::unique_ptr cq_; + CompletionQueue cq_; Call call_; }; @@ -187,15 +185,13 @@ class ClientWriter : public ClientWriterInterface { template ClientWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, R* response) - : context_(context), - cq_(new CompletionQueue), - call_(channel->CreateCall(method, context, cq_.get())) { + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { finish_ops_.RecvMessage(response); CallOpSet ops; ops.SendInitialMetadata(context->send_initial_metadata_); call_.PerformOps(&ops); - cq_->Pluck(&ops); + cq_.Pluck(&ops); } using WriterInterface::Write; @@ -205,14 +201,14 @@ class ClientWriter : public ClientWriterInterface { return false; } call_.PerformOps(&ops); - return cq_->Pluck(&ops); + return cq_.Pluck(&ops); } bool WritesDone() GRPC_OVERRIDE { CallOpSet ops; ops.ClientSendClose(); call_.PerformOps(&ops); - return cq_->Pluck(&ops); + return cq_.Pluck(&ops); } /// Read the final response and wait for the final status. @@ -220,14 +216,14 @@ class ClientWriter : public ClientWriterInterface { Status status; finish_ops_.ClientRecvStatus(context_, &status); call_.PerformOps(&finish_ops_); - GPR_ASSERT(cq_->Pluck(&finish_ops_)); + GPR_ASSERT(cq_.Pluck(&finish_ops_)); return status; } private: ClientContext* context_; CallOpSet finish_ops_; - std::unique_ptr cq_; + CompletionQueue cq_; Call call_; }; @@ -255,13 +251,11 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { /// Blocking create a stream. ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context) - : context_(context), - cq_(new CompletionQueue), - call_(channel->CreateCall(method, context, cq_.get())) { + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { CallOpSet ops; ops.SendInitialMetadata(context->send_initial_metadata_); call_.PerformOps(&ops); - cq_->Pluck(&ops); + cq_.Pluck(&ops); } void WaitForInitialMetadata() GRPC_OVERRIDE { @@ -270,7 +264,7 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { CallOpSet ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); - cq_->Pluck(&ops); // status ignored + cq_.Pluck(&ops); // status ignored } bool Read(R* msg) GRPC_OVERRIDE { @@ -280,7 +274,7 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { } ops.RecvMessage(msg); call_.PerformOps(&ops); - return cq_->Pluck(&ops) && ops.got_message; + return cq_.Pluck(&ops) && ops.got_message; } using WriterInterface::Write; @@ -288,14 +282,14 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { CallOpSet ops; if (!ops.SendMessage(msg, options).ok()) return false; call_.PerformOps(&ops); - return cq_->Pluck(&ops); + return cq_.Pluck(&ops); } bool WritesDone() GRPC_OVERRIDE { CallOpSet ops; ops.ClientSendClose(); call_.PerformOps(&ops); - return cq_->Pluck(&ops); + return cq_.Pluck(&ops); } Status Finish() GRPC_OVERRIDE { @@ -303,13 +297,13 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { Status status; ops.ClientRecvStatus(context_, &status); call_.PerformOps(&ops); - GPR_ASSERT(cq_->Pluck(&ops)); + GPR_ASSERT(cq_.Pluck(&ops)); return status; } private: ClientContext* context_; - std::unique_ptr cq_; + CompletionQueue cq_; Call call_; }; -- cgit v1.2.3