aboutsummaryrefslogtreecommitdiffhomepage
path: root/include/grpcpp
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2018-11-30 02:16:08 -0800
committerGravatar Vijay Pai <vpai@google.com>2018-11-30 04:13:40 -0800
commitea1156da3fb3d6fb8660d078e70cf5486fc71a65 (patch)
treea18b3bf4a8d718cf5fe7a2e67c8a951b08bcec5d /include/grpcpp
parentdac2066a1c0df628fefe4144ae0f97337af6324e (diff)
Stop exposing streaming object class
Diffstat (limited to 'include/grpcpp')
-rw-r--r--include/grpcpp/generic/generic_stub.h6
-rw-r--r--include/grpcpp/impl/codegen/client_callback.h302
2 files changed, 194 insertions, 114 deletions
diff --git a/include/grpcpp/generic/generic_stub.h b/include/grpcpp/generic/generic_stub.h
index ccbf8a0e55..eb014184e4 100644
--- a/include/grpcpp/generic/generic_stub.h
+++ b/include/grpcpp/generic/generic_stub.h
@@ -77,9 +77,9 @@ class GenericStub final {
const ByteBuffer* request, ByteBuffer* response,
std::function<void(Status)> on_completion);
- experimental::ClientCallbackReaderWriter<ByteBuffer, ByteBuffer>*
- PrepareBidiStreamingCall(ClientContext* context, const grpc::string& method,
- experimental::ClientBidiReactor* reactor);
+ void PrepareBidiStreamingCall(
+ ClientContext* context, const grpc::string& method,
+ experimental::ClientBidiReactor<ByteBuffer, ByteBuffer>* reactor);
private:
GenericStub* stub_;
diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h
index 92a588b3c3..999c1c8a3e 100644
--- a/include/grpcpp/impl/codegen/client_callback.h
+++ b/include/grpcpp/impl/codegen/client_callback.h
@@ -93,9 +93,67 @@ class CallbackUnaryCallImpl {
namespace experimental {
+// Forward declarations
+template <class Request, class Response>
+class ClientBidiReactor;
+template <class Response>
+class ClientReadReactor;
+template <class Request>
+class ClientWriteReactor;
+
+// NOTE: The streaming objects are not actually implemented in the public API.
+// These interfaces are provided for mocking only. Typical applications
+// will interact exclusively with the reactors that they define.
+template <class Request, class Response>
+class ClientCallbackReaderWriter {
+ public:
+ virtual ~ClientCallbackReaderWriter() {}
+ virtual void StartCall() = 0;
+ virtual void Write(const Request* req, WriteOptions options) = 0;
+ virtual void WritesDone() = 0;
+ virtual void Read(Response* resp) = 0;
+
+ protected:
+ void BindReactor(ClientBidiReactor<Request, Response>* reactor) {
+ reactor->BindStream(this);
+ }
+};
+
+template <class Response>
+class ClientCallbackReader {
+ public:
+ virtual ~ClientCallbackReader() {}
+ virtual void StartCall() = 0;
+ virtual void Read(Response* resp) = 0;
+
+ protected:
+ void BindReactor(ClientReadReactor<Response>* reactor) {
+ reactor->BindReader(this);
+ }
+};
+
+template <class Request>
+class ClientCallbackWriter {
+ public:
+ virtual ~ClientCallbackWriter() {}
+ virtual void StartCall() = 0;
+ void Write(const Request* req) { Write(req, WriteOptions()); }
+ virtual void Write(const Request* req, WriteOptions options) = 0;
+ void WriteLast(const Request* req, WriteOptions options) {
+ Write(req, options.set_last_message());
+ }
+ virtual void WritesDone() = 0;
+
+ protected:
+ void BindReactor(ClientWriteReactor<Request>* reactor) {
+ reactor->BindWriter(this);
+ }
+};
+
// The user must implement this reactor interface with reactions to each event
// type that gets called by the library. An empty reaction is provided by
// default
+template <class Request, class Response>
class ClientBidiReactor {
public:
virtual ~ClientBidiReactor() {}
@@ -104,16 +162,44 @@ class ClientBidiReactor {
virtual void OnReadDone(bool ok) {}
virtual void OnWriteDone(bool ok) {}
virtual void OnWritesDoneDone(bool ok) {}
+
+ void StartCall() { stream_->StartCall(); }
+ void StartRead(Response* resp) { stream_->Read(resp); }
+ void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); }
+ void StartWrite(const Request* req, WriteOptions options) {
+ stream_->Write(req, std::move(options));
+ }
+ void StartWriteLast(const Request* req, WriteOptions options) {
+ StartWrite(req, std::move(options.set_last_message()));
+ }
+ void StartWritesDone() { stream_->WritesDone(); }
+
+ private:
+ friend class ClientCallbackReaderWriter<Request, Response>;
+ void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
+ stream_ = stream;
+ }
+ ClientCallbackReaderWriter<Request, Response>* stream_;
};
+template <class Response>
class ClientReadReactor {
public:
virtual ~ClientReadReactor() {}
virtual void OnDone(Status s) {}
virtual void OnReadInitialMetadataDone(bool ok) {}
virtual void OnReadDone(bool ok) {}
+
+ void StartCall() { reader_->StartCall(); }
+ void StartRead(Response* resp) { reader_->Read(resp); }
+
+ private:
+ friend class ClientCallbackReader<Response>;
+ void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
+ ClientCallbackReader<Response>* reader_;
};
+template <class Request>
class ClientWriteReactor {
public:
virtual ~ClientWriteReactor() {}
@@ -121,41 +207,21 @@ class ClientWriteReactor {
virtual void OnReadInitialMetadataDone(bool ok) {}
virtual void OnWriteDone(bool ok) {}
virtual void OnWritesDoneDone(bool ok) {}
-};
-template <class Request, class Response>
-class ClientCallbackReaderWriter {
- public:
- virtual ~ClientCallbackReaderWriter() {}
- virtual void StartCall() = 0;
- void Write(const Request* req) { Write(req, WriteOptions()); }
- virtual void Write(const Request* req, WriteOptions options) = 0;
- void WriteLast(const Request* req, WriteOptions options) {
- Write(req, options.set_last_message());
+ void StartCall() { writer_->StartCall(); }
+ void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); }
+ void StartWrite(const Request* req, WriteOptions options) {
+ writer_->Write(req, std::move(options));
}
- virtual void WritesDone() = 0;
- virtual void Read(Response* resp) = 0;
-};
-
-template <class Response>
-class ClientCallbackReader {
- public:
- virtual ~ClientCallbackReader() {}
- virtual void StartCall() = 0;
- virtual void Read(Response* resp) = 0;
-};
-
-template <class Request>
-class ClientCallbackWriter {
- public:
- virtual ~ClientCallbackWriter() {}
- virtual void StartCall() = 0;
- void Write(const Request* req) { Write(req, WriteOptions()); }
- virtual void Write(const Request* req, WriteOptions options) = 0;
- void WriteLast(const Request* req, WriteOptions options) {
- Write(req, options.set_last_message());
+ void StartWriteLast(const Request* req, WriteOptions options) {
+ StartWrite(req, std::move(options.set_last_message()));
}
- virtual void WritesDone() = 0;
+ void StartWritesDone() { writer_->WritesDone(); }
+
+ private:
+ friend class ClientCallbackWriter<Request>;
+ void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
+ ClientCallbackWriter<Request>* writer_;
};
} // namespace experimental
@@ -204,12 +270,13 @@ class ClientCallbackReaderWriterImpl
// 4. Any write backlog
started_ = true;
- start_tag_.Set(call_.call(),
- [this](bool ok) {
- reactor_->OnReadInitialMetadataDone(ok);
- MaybeFinish();
- },
- &start_ops_);
+ start_tag_.Set(
+ call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadInitialMetadataDone(ok);
+ MaybeFinish();
+ },
+ &start_ops_);
if (!start_corked_) {
start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
@@ -220,27 +287,29 @@ class ClientCallbackReaderWriterImpl
// Also set up the read and write tags so that they don't have to be set up
// each time
- write_tag_.Set(call_.call(),
- [this](bool ok) {
- reactor_->OnWriteDone(ok);
- MaybeFinish();
- },
- &write_ops_);
+ write_tag_.Set(
+ call_.call(),
+ [this](bool ok) {
+ reactor_->OnWriteDone(ok);
+ MaybeFinish();
+ },
+ &write_ops_);
write_ops_.set_core_cq_tag(&write_tag_);
- read_tag_.Set(call_.call(),
- [this](bool ok) {
- reactor_->OnReadDone(ok);
- MaybeFinish();
- },
- &read_ops_);
+ read_tag_.Set(
+ call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadDone(ok);
+ MaybeFinish();
+ },
+ &read_ops_);
read_ops_.set_core_cq_tag(&read_tag_);
if (read_ops_at_start_) {
call_.PerformOps(&read_ops_);
}
- finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
- &finish_ops_);
+ finish_tag_.Set(
+ call_.call(), [this](bool ok) { MaybeFinish(); }, &finish_ops_);
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_ops_);
@@ -291,12 +360,13 @@ class ClientCallbackReaderWriterImpl
start_corked_ = false;
}
writes_done_ops_.ClientSendClose();
- writes_done_tag_.Set(call_.call(),
- [this](bool ok) {
- reactor_->OnWritesDoneDone(ok);
- MaybeFinish();
- },
- &writes_done_ops_);
+ writes_done_tag_.Set(
+ call_.call(),
+ [this](bool ok) {
+ reactor_->OnWritesDoneDone(ok);
+ MaybeFinish();
+ },
+ &writes_done_ops_);
writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
callbacks_outstanding_++;
if (started_) {
@@ -311,15 +381,17 @@ class ClientCallbackReaderWriterImpl
ClientCallbackReaderWriterImpl(
Call call, ClientContext* context,
- ::grpc::experimental::ClientBidiReactor* reactor)
+ ::grpc::experimental::ClientBidiReactor<Request, Response>* reactor)
: context_(context),
call_(call),
reactor_(reactor),
- start_corked_(context_->initial_metadata_corked_) {}
+ start_corked_(context_->initial_metadata_corked_) {
+ this->BindReactor(reactor);
+ }
ClientContext* context_;
Call call_;
- ::grpc::experimental::ClientBidiReactor* reactor_;
+ ::grpc::experimental::ClientBidiReactor<Request, Response>* reactor_;
CallOpSet<CallOpSendInitialMetadata, CallOpRecvInitialMetadata> start_ops_;
CallbackWithSuccessTag start_tag_;
@@ -350,14 +422,14 @@ class ClientCallbackReaderWriterImpl
template <class Request, class Response>
class ClientCallbackReaderWriterFactory {
public:
- static experimental::ClientCallbackReaderWriter<Request, Response>* Create(
+ static void Create(
ChannelInterface* channel, const ::grpc::internal::RpcMethod& method,
ClientContext* context,
- ::grpc::experimental::ClientBidiReactor* reactor) {
+ ::grpc::experimental::ClientBidiReactor<Request, Response>* reactor) {
Call call = channel->CreateCall(method, context, channel->CallbackCQ());
g_core_codegen_interface->grpc_call_ref(call.call());
- return new (g_core_codegen_interface->grpc_call_arena_alloc(
+ new (g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>)))
ClientCallbackReaderWriterImpl<Request, Response>(call, context,
reactor);
@@ -396,12 +468,13 @@ class ClientCallbackReaderImpl
// 3. Recv trailing metadata, on_completion callback
started_ = true;
- start_tag_.Set(call_.call(),
- [this](bool ok) {
- reactor_->OnReadInitialMetadataDone(ok);
- MaybeFinish();
- },
- &start_ops_);
+ start_tag_.Set(
+ call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadInitialMetadataDone(ok);
+ MaybeFinish();
+ },
+ &start_ops_);
start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
start_ops_.RecvInitialMetadata(context_);
@@ -409,19 +482,20 @@ class ClientCallbackReaderImpl
call_.PerformOps(&start_ops_);
// Also set up the read tag so it doesn't have to be set up each time
- read_tag_.Set(call_.call(),
- [this](bool ok) {
- reactor_->OnReadDone(ok);
- MaybeFinish();
- },
- &read_ops_);
+ read_tag_.Set(
+ call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadDone(ok);
+ MaybeFinish();
+ },
+ &read_ops_);
read_ops_.set_core_cq_tag(&read_tag_);
if (read_ops_at_start_) {
call_.PerformOps(&read_ops_);
}
- finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
- &finish_ops_);
+ finish_tag_.Set(
+ call_.call(), [this](bool ok) { MaybeFinish(); }, &finish_ops_);
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_ops_);
@@ -441,9 +515,11 @@ class ClientCallbackReaderImpl
friend class ClientCallbackReaderFactory<Response>;
template <class Request>
- ClientCallbackReaderImpl(Call call, ClientContext* context, Request* request,
- ::grpc::experimental::ClientReadReactor* reactor)
+ ClientCallbackReaderImpl(
+ Call call, ClientContext* context, Request* request,
+ ::grpc::experimental::ClientReadReactor<Response>* reactor)
: context_(context), call_(call), reactor_(reactor) {
+ this->BindReactor(reactor);
// TODO(vjpai): don't assert
GPR_CODEGEN_ASSERT(start_ops_.SendMessage(*request).ok());
start_ops_.ClientSendClose();
@@ -451,7 +527,7 @@ class ClientCallbackReaderImpl
ClientContext* context_;
Call call_;
- ::grpc::experimental::ClientReadReactor* reactor_;
+ ::grpc::experimental::ClientReadReactor<Response>* reactor_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose,
CallOpRecvInitialMetadata>
@@ -475,14 +551,14 @@ template <class Response>
class ClientCallbackReaderFactory {
public:
template <class Request>
- static experimental::ClientCallbackReader<Response>* Create(
+ static void Create(
ChannelInterface* channel, const ::grpc::internal::RpcMethod& method,
ClientContext* context, const Request* request,
- ::grpc::experimental::ClientReadReactor* reactor) {
+ ::grpc::experimental::ClientReadReactor<Response>* reactor) {
Call call = channel->CreateCall(method, context, channel->CallbackCQ());
g_core_codegen_interface->grpc_call_ref(call.call());
- return new (g_core_codegen_interface->grpc_call_arena_alloc(
+ new (g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
ClientCallbackReaderImpl<Response>(call, context, request, reactor);
}
@@ -520,12 +596,13 @@ class ClientCallbackWriterImpl
// 3. Any backlog
started_ = true;
- start_tag_.Set(call_.call(),
- [this](bool ok) {
- reactor_->OnReadInitialMetadataDone(ok);
- MaybeFinish();
- },
- &start_ops_);
+ start_tag_.Set(
+ call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadInitialMetadataDone(ok);
+ MaybeFinish();
+ },
+ &start_ops_);
if (!start_corked_) {
start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
@@ -536,16 +613,17 @@ class ClientCallbackWriterImpl
// Also set up the read and write tags so that they don't have to be set up
// each time
- write_tag_.Set(call_.call(),
- [this](bool ok) {
- reactor_->OnWriteDone(ok);
- MaybeFinish();
- },
- &write_ops_);
+ write_tag_.Set(
+ call_.call(),
+ [this](bool ok) {
+ reactor_->OnWriteDone(ok);
+ MaybeFinish();
+ },
+ &write_ops_);
write_ops_.set_core_cq_tag(&write_tag_);
- finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
- &finish_ops_);
+ finish_tag_.Set(
+ call_.call(), [this](bool ok) { MaybeFinish(); }, &finish_ops_);
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_ops_);
@@ -586,12 +664,13 @@ class ClientCallbackWriterImpl
start_corked_ = false;
}
writes_done_ops_.ClientSendClose();
- writes_done_tag_.Set(call_.call(),
- [this](bool ok) {
- reactor_->OnWritesDoneDone(ok);
- MaybeFinish();
- },
- &writes_done_ops_);
+ writes_done_tag_.Set(
+ call_.call(),
+ [this](bool ok) {
+ reactor_->OnWritesDoneDone(ok);
+ MaybeFinish();
+ },
+ &writes_done_ops_);
writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
callbacks_outstanding_++;
if (started_) {
@@ -605,20 +684,21 @@ class ClientCallbackWriterImpl
friend class ClientCallbackWriterFactory<Request>;
template <class Response>
- ClientCallbackWriterImpl(Call call, ClientContext* context,
- Response* response,
- ::grpc::experimental::ClientWriteReactor* reactor)
+ ClientCallbackWriterImpl(
+ Call call, ClientContext* context, Response* response,
+ ::grpc::experimental::ClientWriteReactor<Request>* reactor)
: context_(context),
call_(call),
reactor_(reactor),
start_corked_(context_->initial_metadata_corked_) {
+ this->BindReactor(reactor);
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
}
ClientContext* context_;
Call call_;
- ::grpc::experimental::ClientWriteReactor* reactor_;
+ ::grpc::experimental::ClientWriteReactor<Request>* reactor_;
CallOpSet<CallOpSendInitialMetadata, CallOpRecvInitialMetadata> start_ops_;
CallbackWithSuccessTag start_tag_;
@@ -646,14 +726,14 @@ template <class Request>
class ClientCallbackWriterFactory {
public:
template <class Response>
- static experimental::ClientCallbackWriter<Request>* Create(
+ static void Create(
ChannelInterface* channel, const ::grpc::internal::RpcMethod& method,
ClientContext* context, Response* response,
- ::grpc::experimental::ClientWriteReactor* reactor) {
+ ::grpc::experimental::ClientWriteReactor<Request>* reactor) {
Call call = channel->CreateCall(method, context, channel->CallbackCQ());
g_core_codegen_interface->grpc_call_ref(call.call());
- return new (g_core_codegen_interface->grpc_call_arena_alloc(
+ new (g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
ClientCallbackWriterImpl<Request>(call, context, response, reactor);
}