diff options
author | David Garcia Quintas <dgq@google.com> | 2016-01-27 18:41:26 -0800 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2016-01-27 18:41:26 -0800 |
commit | e1300deb87b5fca2b4361a753d0bd4d19b078ea4 (patch) | |
tree | 62016bec7621d6f1650472c20d56fdbf713f077c /include/grpc++/support | |
parent | 6a48405ed003a416bd574d3f480b20e3dee9e9df (diff) |
After GrpcLibrary refactoring. Compiles and passes. WIP still
Diffstat (limited to 'include/grpc++/support')
-rw-r--r-- | include/grpc++/support/sync_stream.h | 62 |
1 files changed, 28 insertions, 34 deletions
diff --git a/include/grpc++/support/sync_stream.h b/include/grpc++/support/sync_stream.h index 77066ada62..3557ba5156 100644 --- a/include/grpc++/support/sync_stream.h +++ b/include/grpc++/support/sync_stream.h @@ -34,18 +34,18 @@ #ifndef GRPCXX_SUPPORT_SYNC_STREAM_H #define GRPCXX_SUPPORT_SYNC_STREAM_H -#include <grpc/impl/codegen/log.h> +#include <grpc++/channel.h> +#include <grpc++/client_context.h> +#include <grpc++/completion_queue.h> +#include <grpc++/impl/call.h> #include <grpc++/impl/codegen/channel_interface.h> -#include <grpc++/impl/codegen/client_context.h> -#include <grpc++/impl/codegen/call.h> -#include <grpc++/impl/codegen/service_type.h> -#include <grpc++/impl/codegen/server_context.h> -#include <grpc++/impl/codegen/status.h> +#include <grpc++/impl/service_type.h> +#include <grpc++/server_context.h> +#include <grpc++/support/status.h> +#include <grpc/support/log.h> namespace grpc { -class CompletionQueue; - /// Common interface for all synchronous client side streaming. class ClientStreamingInterface { public: @@ -121,9 +121,7 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> { template <class W> ClientReader(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const W& request) - : context_(context), - cq_(new CompletionQueue), - call_(channel->CreateCall(method, context, cq_.get())) { + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> ops; ops.SendInitialMetadata(context->send_initial_metadata_); @@ -131,7 +129,7 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> { GPR_ASSERT(ops.SendMessage(request).ok()); ops.ClientSendClose(); call_.PerformOps(&ops); - cq_->Pluck(&ops); + cq_.Pluck(&ops); } void WaitForInitialMetadata() GRPC_OVERRIDE { @@ -140,7 +138,7 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> { CallOpSet<CallOpRecvInitialMetadata> ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); - cq_->Pluck(&ops); /// status ignored + cq_.Pluck(&ops); /// status ignored } bool Read(R* msg) GRPC_OVERRIDE { @@ -150,7 +148,7 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> { } ops.RecvMessage(msg); call_.PerformOps(&ops); - return cq_->Pluck(&ops) && ops.got_message; + return cq_.Pluck(&ops) && ops.got_message; } Status Finish() GRPC_OVERRIDE { @@ -158,13 +156,13 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> { Status status; ops.ClientRecvStatus(context_, &status); call_.PerformOps(&ops); - GPR_ASSERT(cq_->Pluck(&ops)); + GPR_ASSERT(cq_.Pluck(&ops)); return status; } private: ClientContext* context_; - std::unique_ptr<CompletionQueue> cq_; + CompletionQueue cq_; Call call_; }; @@ -187,15 +185,13 @@ class ClientWriter : public ClientWriterInterface<W> { template <class R> ClientWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, R* response) - : context_(context), - cq_(new CompletionQueue), - call_(channel->CreateCall(method, context, cq_.get())) { + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { finish_ops_.RecvMessage(response); CallOpSet<CallOpSendInitialMetadata> ops; ops.SendInitialMetadata(context->send_initial_metadata_); call_.PerformOps(&ops); - cq_->Pluck(&ops); + cq_.Pluck(&ops); } using WriterInterface<W>::Write; @@ -205,14 +201,14 @@ class ClientWriter : public ClientWriterInterface<W> { return false; } call_.PerformOps(&ops); - return cq_->Pluck(&ops); + return cq_.Pluck(&ops); } bool WritesDone() GRPC_OVERRIDE { CallOpSet<CallOpClientSendClose> ops; ops.ClientSendClose(); call_.PerformOps(&ops); - return cq_->Pluck(&ops); + return cq_.Pluck(&ops); } /// Read the final response and wait for the final status. @@ -220,14 +216,14 @@ class ClientWriter : public ClientWriterInterface<W> { Status status; finish_ops_.ClientRecvStatus(context_, &status); call_.PerformOps(&finish_ops_); - GPR_ASSERT(cq_->Pluck(&finish_ops_)); + GPR_ASSERT(cq_.Pluck(&finish_ops_)); return status; } private: ClientContext* context_; CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_; - std::unique_ptr<CompletionQueue> cq_; + CompletionQueue cq_; Call call_; }; @@ -255,13 +251,11 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { /// Blocking create a stream. ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context) - : context_(context), - cq_(new CompletionQueue), - call_(channel->CreateCall(method, context, cq_.get())) { + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { CallOpSet<CallOpSendInitialMetadata> ops; ops.SendInitialMetadata(context->send_initial_metadata_); call_.PerformOps(&ops); - cq_->Pluck(&ops); + cq_.Pluck(&ops); } void WaitForInitialMetadata() GRPC_OVERRIDE { @@ -270,7 +264,7 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { CallOpSet<CallOpRecvInitialMetadata> ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); - cq_->Pluck(&ops); // status ignored + cq_.Pluck(&ops); // status ignored } bool Read(R* msg) GRPC_OVERRIDE { @@ -280,7 +274,7 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { } ops.RecvMessage(msg); call_.PerformOps(&ops); - return cq_->Pluck(&ops) && ops.got_message; + return cq_.Pluck(&ops) && ops.got_message; } using WriterInterface<W>::Write; @@ -288,14 +282,14 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { CallOpSet<CallOpSendMessage> ops; if (!ops.SendMessage(msg, options).ok()) return false; call_.PerformOps(&ops); - return cq_->Pluck(&ops); + return cq_.Pluck(&ops); } bool WritesDone() GRPC_OVERRIDE { CallOpSet<CallOpClientSendClose> ops; ops.ClientSendClose(); call_.PerformOps(&ops); - return cq_->Pluck(&ops); + return cq_.Pluck(&ops); } Status Finish() GRPC_OVERRIDE { @@ -303,13 +297,13 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { Status status; ops.ClientRecvStatus(context_, &status); call_.PerformOps(&ops); - GPR_ASSERT(cq_->Pluck(&ops)); + GPR_ASSERT(cq_.Pluck(&ops)); return status; } private: ClientContext* context_; - std::unique_ptr<CompletionQueue> cq_; + CompletionQueue cq_; Call call_; }; |