diff options
-rw-r--r-- | examples/cpp/helloworld/greeter_async_client.cc | 12 | ||||
-rw-r--r-- | examples/cpp/helloworld/greeter_async_client2.cc | 14 | ||||
-rw-r--r-- | include/grpc++/generic/generic_stub.h | 7 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/async_stream.h | 136 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/async_unary_call.h | 41 | ||||
-rw-r--r-- | src/compiler/cpp_generator.cc | 578 | ||||
-rw-r--r-- | src/cpp/client/generic_stub.cc | 23 | ||||
-rw-r--r-- | test/cpp/codegen/compiler_test_golden | 40 | ||||
-rw-r--r-- | test/cpp/codegen/compiler_test_mock_golden | 5 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 153 |
10 files changed, 651 insertions, 358 deletions
diff --git a/examples/cpp/helloworld/greeter_async_client.cc b/examples/cpp/helloworld/greeter_async_client.cc index 64da044ea6..ddf6c1aaf3 100644 --- a/examples/cpp/helloworld/greeter_async_client.cc +++ b/examples/cpp/helloworld/greeter_async_client.cc @@ -60,11 +60,15 @@ class GreeterClient { // Storage for the status of the RPC upon completion. Status status; - // stub_->AsyncSayHello() performs the RPC call, returning an instance we - // store in "rpc". Because we are using the asynchronous API, we need to - // hold on to the "rpc" instance in order to get updates on the ongoing RPC. + // stub_->PrepareAsyncSayHello() creates an RPC object, returning + // an instance to store in "call" but does not actually start the RPC + // Because we are using the asynchronous API, we need to hold on to + // the "call" instance in order to get updates on the ongoing RPC. std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc( - stub_->AsyncSayHello(&context, request, &cq)); + stub_->PrepareAsyncSayHello(&context, request, &cq)); + + // StartCall initiates the RPC call + rpc->StartCall(); // Request that, upon completion of the RPC, "reply" be updated with the // server's response; "status" with the indication of whether the operation diff --git a/examples/cpp/helloworld/greeter_async_client2.cc b/examples/cpp/helloworld/greeter_async_client2.cc index 4192546bef..3154e84d85 100644 --- a/examples/cpp/helloworld/greeter_async_client2.cc +++ b/examples/cpp/helloworld/greeter_async_client2.cc @@ -49,11 +49,15 @@ class GreeterClient { // Call object to store rpc data AsyncClientCall* call = new AsyncClientCall; - // stub_->AsyncSayHello() performs the RPC call, returning an instance to - // store in "call". Because we are using the asynchronous API, we need to - // hold on to the "call" instance in order to get updates on the ongoing RPC. - call->response_reader = stub_->AsyncSayHello(&call->context, request, &cq_); - + // stub_->PrepareAsyncSayHello() creates an RPC object, returning + // an instance to store in "call" but does not actually start the RPC + // Because we are using the asynchronous API, we need to hold on to + // the "call" instance in order to get updates on the ongoing RPC. + call->response_reader = + stub_->PrepareAsyncSayHello(&call->context, request, &cq_); + + // StartCall initiates the RPC call + call->response_reader->StartCall(); // Request that, upon completion of the RPC, "reply" be updated with the // server's response; "status" with the indication of whether the operation diff --git a/include/grpc++/generic/generic_stub.h b/include/grpc++/generic/generic_stub.h index 7b269d0887..2b3ff59ea2 100644 --- a/include/grpc++/generic/generic_stub.h +++ b/include/grpc++/generic/generic_stub.h @@ -44,6 +44,13 @@ class GenericStub final { ClientContext* context, const grpc::string& method, CompletionQueue* cq, void* tag); + /// Setup a call to a named method \a method using \a context, but don't + /// start it. Let it be started explicitly with StartCall and a tag. + /// The return value only indicates whether or not registration of the call + /// succeeded (i.e. the call won't proceed if the return value is nullptr). + std::unique_ptr<GenericClientAsyncReaderWriter> PrepareCall( + ClientContext* context, const grpc::string& method, CompletionQueue* cq); + private: std::shared_ptr<ChannelInterface> channel_; }; diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index 9cf7ac30dd..e60572fc93 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -35,6 +35,11 @@ class ClientAsyncStreamingInterface { public: virtual ~ClientAsyncStreamingInterface() {} + /// Start the call that was set up by the constructor, but only if the + /// constructor was invoked through the "Prepare" API which doesn't actually + /// start the call + virtual void StartCall(void* tag) = 0; + /// Request notification of the reading of the initial metadata. Completion /// will be notified by \a tag on the associated completion queue. /// This call is optional, but if it is used, it cannot be used concurrently @@ -156,20 +161,22 @@ class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface, template <class R> class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { public: - /// Create a stream and write the first request out. + /// Create a stream object. + /// Write the first request out if \a start is set. /// \a tag will be notified on \a cq when the call has been started and - /// \a request has been written out. + /// \a request has been written out. If \a start is not set, \a tag must be + /// nullptr and the actual call must be initiated by StartCall /// Note that \a context will be used to fill in custom initial metadata /// used to send to the server when starting the call. template <class W> static ClientAsyncReader* Create(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, const W& request, - void* tag) { + bool start, void* tag) { Call call = channel->CreateCall(method, context, cq); return new (g_core_codegen_interface->grpc_call_arena_alloc( call.call(), sizeof(ClientAsyncReader))) - ClientAsyncReader(call, context, request, tag); + ClientAsyncReader(call, context, request, start, tag); } // always allocated against a call arena, no memory free required @@ -177,6 +184,12 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { assert(size == sizeof(ClientAsyncReader)); } + void StartCall(void* tag) override { + assert(!started_); + started_ = true; + StartCallInternal(tag); + } + /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata /// method for semantics. /// @@ -186,6 +199,7 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { /// calling code can access the received metadata through the /// \a ClientContext. void ReadInitialMetadata(void* tag) override { + assert(started_); GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); meta_ops_.set_output_tag(tag); @@ -194,6 +208,7 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { } void Read(R* msg, void* tag) override { + assert(started_); read_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { read_ops_.RecvInitialMetadata(context_); @@ -208,6 +223,7 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { /// - the \a ClientContext associated with this call is updated with /// possible initial and trailing metadata received from the server. void Finish(Status* status, void* tag) override { + assert(started_); finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { finish_ops_.RecvInitialMetadata(context_); @@ -219,19 +235,28 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { private: template <class W> ClientAsyncReader(Call call, ClientContext* context, const W& request, - void* tag) - : context_(context), call_(call) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); + bool start, void* tag) + : context_(context), call_(call), started_(start) { // TODO(ctiller): don't assert GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok()); init_ops_.ClientSendClose(); + if (start) { + StartCallInternal(tag); + } else { + assert(tag == nullptr); + } + } + + void StartCallInternal(void* tag) { + init_ops_.SendInitialMetadata(context_->send_initial_metadata_, + context_->initial_metadata_flags()); + init_ops_.set_output_tag(tag); call_.PerformOps(&init_ops_); } ClientContext* context_; Call call_; + bool started_; CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> init_ops_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_; @@ -257,9 +282,12 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, template <class W> class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { public: - /// Create a stream and write the first request out. + /// Create a stream object. + /// Start the RPC if \a start is set /// \a tag will be notified on \a cq when the call has been started (i.e. /// intitial metadata sent) and \a request has been written out. + /// If \a start is not set, \a tag must be nullptr and the actual call + /// must be initiated by StartCall /// Note that \a context will be used to fill in custom initial metadata /// used to send to the server when starting the call. /// \a response will be filled in with the single expected response @@ -269,11 +297,11 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { static ClientAsyncWriter* Create(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, R* response, - void* tag) { + bool start, void* tag) { Call call = channel->CreateCall(method, context, cq); return new (g_core_codegen_interface->grpc_call_arena_alloc( call.call(), sizeof(ClientAsyncWriter))) - ClientAsyncWriter(call, context, response, tag); + ClientAsyncWriter(call, context, response, start, tag); } // always allocated against a call arena, no memory free required @@ -281,6 +309,12 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { assert(size == sizeof(ClientAsyncWriter)); } + void StartCall(void* tag) override { + assert(!started_); + started_ = true; + StartCallInternal(tag); + } + /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method for /// semantics. /// @@ -289,6 +323,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { /// associated with this call is updated, and the calling code can access /// the received metadata through the \a ClientContext. void ReadInitialMetadata(void* tag) override { + assert(started_); GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); meta_ops_.set_output_tag(tag); @@ -297,6 +332,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { } void Write(const W& msg, void* tag) override { + assert(started_); write_ops_.set_output_tag(tag); // TODO(ctiller): don't assert GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); @@ -304,6 +340,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { } void Write(const W& msg, WriteOptions options, void* tag) override { + assert(started_); write_ops_.set_output_tag(tag); if (options.is_last_message()) { options.set_buffer_hint(); @@ -315,6 +352,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { } void WritesDone(void* tag) override { + assert(started_); write_ops_.set_output_tag(tag); write_ops_.ClientSendClose(); call_.PerformOps(&write_ops_); @@ -328,6 +366,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { /// - attempts to fill in the \a response parameter passed to this class's /// constructor with the server's response message. void Finish(Status* status, void* tag) override { + assert(started_); finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { finish_ops_.RecvInitialMetadata(context_); @@ -338,25 +377,32 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { private: template <class R> - ClientAsyncWriter(Call call, ClientContext* context, R* response, void* tag) - : context_(context), call_(call) { + ClientAsyncWriter(Call call, ClientContext* context, R* response, bool start, + void* tag) + : context_(context), call_(call), started_(start) { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); - // if corked bit is set in context, we buffer up the initial metadata to - // coalesce with later message to be sent. No op is performed. - if (context_->initial_metadata_corked_) { - write_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); + if (start) { + StartCallInternal(tag); } else { + assert(tag == nullptr); + } + } + + void StartCallInternal(void* tag) { + write_ops_.SendInitialMetadata(context_->send_initial_metadata_, + context_->initial_metadata_flags()); + // if corked bit is set in context, we just keep the initial metadata + // buffered up to coalesce with later message send. No op is performed. + if (!context_->initial_metadata_corked_) { write_ops_.set_output_tag(tag); - write_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); call_.PerformOps(&write_ops_); } } ClientContext* context_; Call call_; + bool started_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_; CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> write_ops_; @@ -388,20 +434,23 @@ template <class W, class R> class ClientAsyncReaderWriter final : public ClientAsyncReaderWriterInterface<W, R> { public: - /// Create a stream and write the first request out. + /// Create a stream object. + /// Start the RPC request if \a start is set. /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent). + /// intitial metadata sent). If \a start is not set, \a tag must be + /// nullptr and the actual call must be initiated by StartCall /// Note that \a context will be used to fill in custom initial metadata /// used to send to the server when starting the call. static ClientAsyncReaderWriter* Create(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, - ClientContext* context, void* tag) { + ClientContext* context, bool start, + void* tag) { Call call = channel->CreateCall(method, context, cq); return new (g_core_codegen_interface->grpc_call_arena_alloc( call.call(), sizeof(ClientAsyncReaderWriter))) - ClientAsyncReaderWriter(call, context, tag); + ClientAsyncReaderWriter(call, context, start, tag); } // always allocated against a call arena, no memory free required @@ -409,6 +458,12 @@ class ClientAsyncReaderWriter final assert(size == sizeof(ClientAsyncReaderWriter)); } + void StartCall(void* tag) override { + assert(!started_); + started_ = true; + StartCallInternal(tag); + } + /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method /// for semantics of this method. /// @@ -417,6 +472,7 @@ class ClientAsyncReaderWriter final /// is updated with it, and then the receiving initial metadata can /// be accessed through this \a ClientContext. void ReadInitialMetadata(void* tag) override { + assert(started_); GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); meta_ops_.set_output_tag(tag); @@ -425,6 +481,7 @@ class ClientAsyncReaderWriter final } void Read(R* msg, void* tag) override { + assert(started_); read_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { read_ops_.RecvInitialMetadata(context_); @@ -434,6 +491,7 @@ class ClientAsyncReaderWriter final } void Write(const W& msg, void* tag) override { + assert(started_); write_ops_.set_output_tag(tag); // TODO(ctiller): don't assert GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); @@ -441,6 +499,7 @@ class ClientAsyncReaderWriter final } void Write(const W& msg, WriteOptions options, void* tag) override { + assert(started_); write_ops_.set_output_tag(tag); if (options.is_last_message()) { options.set_buffer_hint(); @@ -452,6 +511,7 @@ class ClientAsyncReaderWriter final } void WritesDone(void* tag) override { + assert(started_); write_ops_.set_output_tag(tag); write_ops_.ClientSendClose(); call_.PerformOps(&write_ops_); @@ -462,6 +522,7 @@ class ClientAsyncReaderWriter final /// - the \a ClientContext associated with this call is updated with /// possible initial and trailing metadata sent from the server. void Finish(Status* status, void* tag) override { + assert(started_); finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { finish_ops_.RecvInitialMetadata(context_); @@ -471,23 +532,30 @@ class ClientAsyncReaderWriter final } private: - ClientAsyncReaderWriter(Call call, ClientContext* context, void* tag) - : context_(context), call_(call) { - if (context_->initial_metadata_corked_) { - // if corked bit is set in context, we buffer up the initial metadata to - // coalesce with later message to be sent. No op is performed. - write_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); + ClientAsyncReaderWriter(Call call, ClientContext* context, bool start, + void* tag) + : context_(context), call_(call), started_(start) { + if (start) { + StartCallInternal(tag); } else { + assert(tag == nullptr); + } + } + + void StartCallInternal(void* tag) { + write_ops_.SendInitialMetadata(context_->send_initial_metadata_, + context_->initial_metadata_flags()); + // if corked bit is set in context, we just keep the initial metadata + // buffered up to coalesce with later message send. No op is performed. + if (!context_->initial_metadata_corked_) { write_ops_.set_output_tag(tag); - write_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); call_.PerformOps(&write_ops_); } } ClientContext* context_; Call call_; + bool started_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_; CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index f0f909686b..e472f04f56 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -32,13 +32,18 @@ namespace grpc { class CompletionQueue; extern CoreCodegenInterface* g_core_codegen_interface; -/// An interface relevant for async client side unary RPCS (which send +/// An interface relevant for async client side unary RPCs (which send /// one request message to a server and receive one response message). template <class R> class ClientAsyncResponseReaderInterface { public: virtual ~ClientAsyncResponseReaderInterface() {} + /// Start the call that was set up by the constructor, but only if the + /// constructor was invoked through the "Prepare" API which doesn't actually + /// start the call + virtual void StartCall() = 0; + /// Request notification of the reading of initial metadata. Completion /// will be notified by \a tag on the associated completion queue. /// This call is optional, but if it is used, it cannot be used concurrently @@ -70,9 +75,10 @@ template <class R> class ClientAsyncResponseReader final : public ClientAsyncResponseReaderInterface<R> { public: - /// Start a call and write the request out. + /// Start a call and write the request out if \a start is set. /// \a tag will be notified on \a cq when the call has been started (i.e. /// intitial metadata sent) and \a request has been written out. + /// If \a start is not set, the actual call must be initiated by StartCall /// Note that \a context will be used to fill in custom initial metadata /// used to send to the server when starting the call. template <class W> @@ -80,11 +86,11 @@ class ClientAsyncResponseReader final CompletionQueue* cq, const RpcMethod& method, ClientContext* context, - const W& request) { + const W& request, bool start) { Call call = channel->CreateCall(method, context, cq); return new (g_core_codegen_interface->grpc_call_arena_alloc( call.call(), sizeof(ClientAsyncResponseReader))) - ClientAsyncResponseReader(call, context, request); + ClientAsyncResponseReader(call, context, request, start); } // always allocated against a call arena, no memory free required @@ -92,13 +98,20 @@ class ClientAsyncResponseReader final assert(size == sizeof(ClientAsyncResponseReader)); } + void StartCall() override { + assert(!started_); + started_ = true; + StartCallInternal(); + } + /// See \a ClientAsyncResponseReaderInterface::ReadInitialMetadata for /// semantics. /// /// Side effect: /// - the \a ClientContext associated with this call is updated with /// possible initial and trailing metadata sent from the server. - void ReadInitialMetadata(void* tag) { + void ReadInitialMetadata(void* tag) override { + assert(started_); GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); meta_buf.set_output_tag(tag); @@ -111,7 +124,8 @@ class ClientAsyncResponseReader final /// Side effect: /// - the \a ClientContext associated with this call is updated with /// possible initial and trailing metadata sent from the server. - void Finish(R* msg, Status* status, void* tag) { + void Finish(R* msg, Status* status, void* tag) override { + assert(started_); finish_buf.set_output_tag(tag); if (!context_->initial_metadata_received_) { finish_buf.RecvInitialMetadata(context_); @@ -125,15 +139,22 @@ class ClientAsyncResponseReader final private: ClientContext* const context_; Call call_; + bool started_; template <class W> - ClientAsyncResponseReader(Call call, ClientContext* context, const W& request) - : context_(context), call_(call) { - init_buf.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); + ClientAsyncResponseReader(Call call, ClientContext* context, const W& request, + bool start) + : context_(context), call_(call), started_(start) { + // Bind the metadata at time of StartCallInternal but set up the rest here // TODO(ctiller): don't assert GPR_CODEGEN_ASSERT(init_buf.SendMessage(request).ok()); init_buf.ClientSendClose(); + if (start) StartCallInternal(); + } + + void StartCallInternal() { + init_buf.SendInitialMetadata(context_->send_initial_metadata_, + context_->initial_metadata_flags()); call_.PerformOps(&init_buf); } diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index b09bf99677..c2db8eff71 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -165,25 +165,37 @@ void PrintHeaderClientMethodInterfaces( (*vars)["Request"] = method->input_type_name(); (*vars)["Response"] = method->output_type_name(); + struct { + grpc::string prefix; + grpc::string method_params; // extra arguments to method + grpc::string raw_args; // extra arguments to raw version of method + } async_prefixes[] = {{"Async", ", void* tag", ", tag"}, + {"PrepareAsync", "", ""}}; + if (is_public) { if (method->NoStreaming()) { printer->Print( *vars, "virtual ::grpc::Status $Method$(::grpc::ClientContext* context, " "const $Request$& request, $Response$* response) = 0;\n"); - printer->Print(*vars, - "std::unique_ptr< " - "::grpc::ClientAsyncResponseReaderInterface< $Response$>> " - "Async$Method$(::grpc::ClientContext* context, " - "const $Request$& request, " - "::grpc::CompletionQueue* cq) {\n"); - printer->Indent(); - printer->Print(*vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncResponseReaderInterface< $Response$>>(" - "Async$Method$Raw(context, request, cq));\n"); - printer->Outdent(); - printer->Print("}\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + printer->Print( + *vars, + "std::unique_ptr< " + "::grpc::ClientAsyncResponseReaderInterface< $Response$>> " + "$AsyncPrefix$$Method$(::grpc::ClientContext* context, " + "const $Request$& request, " + "::grpc::CompletionQueue* cq) {\n"); + printer->Indent(); + printer->Print( + *vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncResponseReaderInterface< $Response$>>(" + "$AsyncPrefix$$Method$Raw(context, request, cq));\n"); + printer->Outdent(); + printer->Print("}\n"); + } } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, @@ -197,19 +209,26 @@ void PrintHeaderClientMethodInterfaces( "($Method$Raw(context, response));\n"); printer->Outdent(); printer->Print("}\n"); - printer->Print( - *vars, - "std::unique_ptr< ::grpc::ClientAsyncWriterInterface< $Request$>>" - " Async$Method$(::grpc::ClientContext* context, $Response$* " - "response, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Indent(); - printer->Print(*vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncWriterInterface< $Request$>>(" - "Async$Method$Raw(context, response, cq, tag));\n"); - printer->Outdent(); - printer->Print("}\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncRawArgs"] = async_prefix.raw_args; + printer->Print( + *vars, + "std::unique_ptr< ::grpc::ClientAsyncWriterInterface< $Request$>>" + " $AsyncPrefix$$Method$(::grpc::ClientContext* context, " + "$Response$* " + "response, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); + printer->Indent(); + printer->Print(*vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncWriterInterface< $Request$>>(" + "$AsyncPrefix$$Method$Raw(context, response, " + "cq$AsyncRawArgs$));\n"); + printer->Outdent(); + printer->Print("}\n"); + } } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, @@ -223,19 +242,25 @@ void PrintHeaderClientMethodInterfaces( "($Method$Raw(context, request));\n"); printer->Outdent(); printer->Print("}\n"); - printer->Print( - *vars, - "std::unique_ptr< ::grpc::ClientAsyncReaderInterface< $Response$>> " - "Async$Method$(" - "::grpc::ClientContext* context, const $Request$& request, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Indent(); - printer->Print(*vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncReaderInterface< $Response$>>(" - "Async$Method$Raw(context, request, cq, tag));\n"); - printer->Outdent(); - printer->Print("}\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncRawArgs"] = async_prefix.raw_args; + printer->Print( + *vars, + "std::unique_ptr< ::grpc::ClientAsyncReaderInterface< $Response$>> " + "$AsyncPrefix$$Method$(" + "::grpc::ClientContext* context, const $Request$& request, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); + printer->Indent(); + printer->Print( + *vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncReaderInterface< $Response$>>(" + "$AsyncPrefix$$Method$Raw(context, request, cq$AsyncRawArgs$));\n"); + printer->Outdent(); + printer->Print("}\n"); + } } else if (method->BidiStreaming()) { printer->Print(*vars, "std::unique_ptr< ::grpc::ClientReaderWriterInterface< " @@ -249,61 +274,83 @@ void PrintHeaderClientMethodInterfaces( "$Method$Raw(context));\n"); printer->Outdent(); printer->Print("}\n"); - printer->Print( - *vars, - "std::unique_ptr< " - "::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>> " - "Async$Method$(::grpc::ClientContext* context, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>>(" - "Async$Method$Raw(context, cq, tag));\n"); - printer->Outdent(); - printer->Print("}\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncRawArgs"] = async_prefix.raw_args; + printer->Print( + *vars, + "std::unique_ptr< " + "::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>> " + "$AsyncPrefix$$Method$(::grpc::ClientContext* context, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); + printer->Indent(); + printer->Print( + *vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>>(" + "$AsyncPrefix$$Method$Raw(context, cq$AsyncRawArgs$));\n"); + printer->Outdent(); + printer->Print("}\n"); + } } } else { if (method->NoStreaming()) { - printer->Print( - *vars, - "virtual ::grpc::ClientAsyncResponseReaderInterface< $Response$>* " - "Async$Method$Raw(::grpc::ClientContext* context, " - "const $Request$& request, " - "::grpc::CompletionQueue* cq) = 0;\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + printer->Print( + *vars, + "virtual ::grpc::ClientAsyncResponseReaderInterface< $Response$>* " + "$AsyncPrefix$$Method$Raw(::grpc::ClientContext* context, " + "const $Request$& request, " + "::grpc::CompletionQueue* cq) = 0;\n"); + } } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, "virtual ::grpc::ClientWriterInterface< $Request$>*" " $Method$Raw(" "::grpc::ClientContext* context, $Response$* response) = 0;\n"); - printer->Print(*vars, - "virtual ::grpc::ClientAsyncWriterInterface< $Request$>*" - " Async$Method$Raw(::grpc::ClientContext* context, " - "$Response$* response, " - "::grpc::CompletionQueue* cq, void* tag) = 0;\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + printer->Print( + *vars, + "virtual ::grpc::ClientAsyncWriterInterface< $Request$>*" + " $AsyncPrefix$$Method$Raw(::grpc::ClientContext* context, " + "$Response$* response, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) = 0;\n"); + } } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, "virtual ::grpc::ClientReaderInterface< $Response$>* $Method$Raw(" "::grpc::ClientContext* context, const $Request$& request) = 0;\n"); - printer->Print( - *vars, - "virtual ::grpc::ClientAsyncReaderInterface< $Response$>* " - "Async$Method$Raw(" - "::grpc::ClientContext* context, const $Request$& request, " - "::grpc::CompletionQueue* cq, void* tag) = 0;\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + printer->Print( + *vars, + "virtual ::grpc::ClientAsyncReaderInterface< $Response$>* " + "$AsyncPrefix$$Method$Raw(" + "::grpc::ClientContext* context, const $Request$& request, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) = 0;\n"); + } } else if (method->BidiStreaming()) { printer->Print(*vars, "virtual ::grpc::ClientReaderWriterInterface< $Request$, " "$Response$>* " "$Method$Raw(::grpc::ClientContext* context) = 0;\n"); - printer->Print(*vars, - "virtual ::grpc::ClientAsyncReaderWriterInterface< " - "$Request$, $Response$>* " - "Async$Method$Raw(::grpc::ClientContext* context, " - "::grpc::CompletionQueue* cq, void* tag) = 0;\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + printer->Print( + *vars, + "virtual ::grpc::ClientAsyncReaderWriterInterface< " + "$Request$, $Response$>* " + "$AsyncPrefix$$Method$Raw(::grpc::ClientContext* context, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) = 0;\n"); + } } } } @@ -315,25 +362,35 @@ void PrintHeaderClientMethod(grpc_generator::Printer *printer, (*vars)["Method"] = method->name(); (*vars)["Request"] = method->input_type_name(); (*vars)["Response"] = method->output_type_name(); + struct { + grpc::string prefix; + grpc::string method_params; // extra arguments to method + grpc::string raw_args; // extra arguments to raw version of method + } async_prefixes[] = {{"Async", ", void* tag", ", tag"}, + {"PrepareAsync", "", ""}}; + if (is_public) { if (method->NoStreaming()) { printer->Print( *vars, "::grpc::Status $Method$(::grpc::ClientContext* context, " "const $Request$& request, $Response$* response) override;\n"); - printer->Print( - *vars, - "std::unique_ptr< ::grpc::ClientAsyncResponseReader< $Response$>> " - "Async$Method$(::grpc::ClientContext* context, " - "const $Request$& request, " - "::grpc::CompletionQueue* cq) {\n"); - printer->Indent(); - printer->Print(*vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncResponseReader< $Response$>>(" - "Async$Method$Raw(context, request, cq));\n"); - printer->Outdent(); - printer->Print("}\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + printer->Print( + *vars, + "std::unique_ptr< ::grpc::ClientAsyncResponseReader< $Response$>> " + "$AsyncPrefix$$Method$(::grpc::ClientContext* context, " + "const $Request$& request, " + "::grpc::CompletionQueue* cq) {\n"); + printer->Indent(); + printer->Print(*vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncResponseReader< $Response$>>(" + "$AsyncPrefix$$Method$Raw(context, request, cq));\n"); + printer->Outdent(); + printer->Print("}\n"); + } } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, @@ -346,18 +403,24 @@ void PrintHeaderClientMethod(grpc_generator::Printer *printer, "($Method$Raw(context, response));\n"); printer->Outdent(); printer->Print("}\n"); - printer->Print(*vars, - "std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>" - " Async$Method$(::grpc::ClientContext* context, " - "$Response$* response, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>(" - "Async$Method$Raw(context, response, cq, tag));\n"); - printer->Outdent(); - printer->Print("}\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncRawArgs"] = async_prefix.raw_args; + printer->Print(*vars, + "std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>" + " $AsyncPrefix$$Method$(::grpc::ClientContext* context, " + "$Response$* response, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); + printer->Indent(); + printer->Print( + *vars, + "return std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>(" + "$AsyncPrefix$$Method$Raw(context, response, " + "cq$AsyncRawArgs$));\n"); + printer->Outdent(); + printer->Print("}\n"); + } } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, @@ -371,19 +434,24 @@ void PrintHeaderClientMethod(grpc_generator::Printer *printer, "($Method$Raw(context, request));\n"); printer->Outdent(); printer->Print("}\n"); - printer->Print( - *vars, - "std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>> " - "Async$Method$(" - "::grpc::ClientContext* context, const $Request$& request, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>>(" - "Async$Method$Raw(context, request, cq, tag));\n"); - printer->Outdent(); - printer->Print("}\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncRawArgs"] = async_prefix.raw_args; + printer->Print( + *vars, + "std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>> " + "$AsyncPrefix$$Method$(" + "::grpc::ClientContext* context, const $Request$& request, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); + printer->Indent(); + printer->Print( + *vars, + "return std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>>(" + "$AsyncPrefix$$Method$Raw(context, request, cq$AsyncRawArgs$));\n"); + printer->Outdent(); + printer->Print("}\n"); + } } else if (method->BidiStreaming()) { printer->Print( *vars, @@ -396,53 +464,80 @@ void PrintHeaderClientMethod(grpc_generator::Printer *printer, "$Method$Raw(context));\n"); printer->Outdent(); printer->Print("}\n"); - printer->Print(*vars, - "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< " - "$Request$, $Response$>> " - "Async$Method$(::grpc::ClientContext* context, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Indent(); - printer->Print(*vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>>(" - "Async$Method$Raw(context, cq, tag));\n"); - printer->Outdent(); - printer->Print("}\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncRawArgs"] = async_prefix.raw_args; + printer->Print(*vars, + "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< " + "$Request$, $Response$>> " + "$AsyncPrefix$$Method$(::grpc::ClientContext* context, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); + printer->Indent(); + printer->Print( + *vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>>(" + "$AsyncPrefix$$Method$Raw(context, cq$AsyncRawArgs$));\n"); + printer->Outdent(); + printer->Print("}\n"); + } } } else { if (method->NoStreaming()) { - printer->Print(*vars, - "::grpc::ClientAsyncResponseReader< $Response$>* " - "Async$Method$Raw(::grpc::ClientContext* context, " - "const $Request$& request, " - "::grpc::CompletionQueue* cq) override;\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + printer->Print( + *vars, + "::grpc::ClientAsyncResponseReader< $Response$>* " + "$AsyncPrefix$$Method$Raw(::grpc::ClientContext* context, " + "const $Request$& request, " + "::grpc::CompletionQueue* cq) override;\n"); + } } else if (ClientOnlyStreaming(method)) { printer->Print(*vars, "::grpc::ClientWriter< $Request$>* $Method$Raw(" "::grpc::ClientContext* context, $Response$* response) " "override;\n"); - printer->Print(*vars, - "::grpc::ClientAsyncWriter< $Request$>* Async$Method$Raw(" - "::grpc::ClientContext* context, $Response$* response, " - "::grpc::CompletionQueue* cq, void* tag) override;\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncRawArgs"] = async_prefix.raw_args; + printer->Print( + *vars, + "::grpc::ClientAsyncWriter< $Request$>* $AsyncPrefix$$Method$Raw(" + "::grpc::ClientContext* context, $Response$* response, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) override;\n"); + } } else if (ServerOnlyStreaming(method)) { printer->Print(*vars, "::grpc::ClientReader< $Response$>* $Method$Raw(" "::grpc::ClientContext* context, const $Request$& request)" " override;\n"); - printer->Print( - *vars, - "::grpc::ClientAsyncReader< $Response$>* Async$Method$Raw(" - "::grpc::ClientContext* context, const $Request$& request, " - "::grpc::CompletionQueue* cq, void* tag) override;\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncRawArgs"] = async_prefix.raw_args; + printer->Print( + *vars, + "::grpc::ClientAsyncReader< $Response$>* $AsyncPrefix$$Method$Raw(" + "::grpc::ClientContext* context, const $Request$& request, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) override;\n"); + } } else if (method->BidiStreaming()) { printer->Print(*vars, "::grpc::ClientReaderWriter< $Request$, $Response$>* " "$Method$Raw(::grpc::ClientContext* context) override;\n"); - printer->Print(*vars, - "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " - "Async$Method$Raw(::grpc::ClientContext* context, " - "::grpc::CompletionQueue* cq, void* tag) override;\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncRawArgs"] = async_prefix.raw_args; + printer->Print( + *vars, + "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " + "$AsyncPrefix$$Method$Raw(::grpc::ClientContext* context, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) override;\n"); + } } } } @@ -1077,6 +1172,13 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, (*vars)["Method"] = method->name(); (*vars)["Request"] = method->input_type_name(); (*vars)["Response"] = method->output_type_name(); + struct { + grpc::string prefix; + grpc::string start; // bool literal expressed as string + grpc::string method_params; // extra arguments to method + grpc::string create_args; // extra arguments to creator + } async_prefixes[] = {{"Async", "true", ", void* tag", ", tag"}, + {"PrepareAsync", "false", "", ", nullptr"}}; if (method->NoStreaming()) { printer->Print(*vars, "::grpc::Status $ns$$Service$::Stub::$Method$(" @@ -1087,19 +1189,23 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "rpcmethod_$Method$_, " "context, request, response);\n" "}\n\n"); - printer->Print( - *vars, - "::grpc::ClientAsyncResponseReader< $Response$>* " - "$ns$$Service$::Stub::Async$Method$Raw(::grpc::ClientContext* context, " - "const $Request$& request, " - "::grpc::CompletionQueue* cq) {\n"); - printer->Print(*vars, - " return " - "::grpc::ClientAsyncResponseReader< $Response$>::Create(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, request);\n" - "}\n\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncStart"] = async_prefix.start; + printer->Print(*vars, + "::grpc::ClientAsyncResponseReader< $Response$>* " + "$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw(::grpc::" + "ClientContext* context, " + "const $Request$& request, " + "::grpc::CompletionQueue* cq) {\n"); + printer->Print(*vars, + " return " + "::grpc::ClientAsyncResponseReader< $Response$>::Create(" + "channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, request, $AsyncStart$);\n" + "}\n\n"); + } } else if (ClientOnlyStreaming(method)) { printer->Print(*vars, "::grpc::ClientWriter< $Request$>* " @@ -1111,17 +1217,23 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "rpcmethod_$Method$_, " "context, response);\n" "}\n\n"); - printer->Print(*vars, - "::grpc::ClientAsyncWriter< $Request$>* " - "$ns$$Service$::Stub::Async$Method$Raw(" - "::grpc::ClientContext* context, $Response$* response, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Print(*vars, - " return ::grpc::ClientAsyncWriter< $Request$>::Create(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, response, tag);\n" - "}\n\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncStart"] = async_prefix.start; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncCreateArgs"] = async_prefix.create_args; + printer->Print(*vars, + "::grpc::ClientAsyncWriter< $Request$>* " + "$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw(" + "::grpc::ClientContext* context, $Response$* response, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); + printer->Print(*vars, + " return ::grpc::ClientAsyncWriter< $Request$>::Create(" + "channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, response, $AsyncStart$$AsyncCreateArgs$);\n" + "}\n\n"); + } } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, @@ -1134,17 +1246,24 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "rpcmethod_$Method$_, " "context, request);\n" "}\n\n"); - printer->Print(*vars, - "::grpc::ClientAsyncReader< $Response$>* " - "$ns$$Service$::Stub::Async$Method$Raw(" - "::grpc::ClientContext* context, const $Request$& request, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Print(*vars, - " return ::grpc::ClientAsyncReader< $Response$>::Create(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, request, tag);\n" - "}\n\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncStart"] = async_prefix.start; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncCreateArgs"] = async_prefix.create_args; + printer->Print( + *vars, + "::grpc::ClientAsyncReader< $Response$>* " + "$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw(" + "::grpc::ClientContext* context, const $Request$& request, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); + printer->Print(*vars, + " return ::grpc::ClientAsyncReader< $Response$>::Create(" + "channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, request, $AsyncStart$$AsyncCreateArgs$);\n" + "}\n\n"); + } } else if (method->BidiStreaming()) { printer->Print( *vars, @@ -1157,19 +1276,25 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "rpcmethod_$Method$_, " "context);\n" "}\n\n"); - printer->Print( - *vars, - "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " - "$ns$$Service$::Stub::Async$Method$Raw(::grpc::ClientContext* context, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Print( - *vars, - " return " - "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::Create(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, tag);\n" - "}\n\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncStart"] = async_prefix.start; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncCreateArgs"] = async_prefix.create_args; + printer->Print(*vars, + "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " + "$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw(::grpc::" + "ClientContext* context, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); + printer->Print( + *vars, + " return " + "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::Create(" + "channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, $AsyncStart$$AsyncCreateArgs$);\n" + "}\n\n"); + } } } @@ -1460,50 +1585,79 @@ void PrintMockClientMethods(grpc_generator::Printer *printer, (*vars)["Request"] = method->input_type_name(); (*vars)["Response"] = method->output_type_name(); + struct { + grpc::string prefix; + grpc::string method_params; // extra arguments to method + int extra_method_param_count; + } async_prefixes[] = {{"Async", ", void* tag", 1}, {"PrepareAsync", "", 0}}; + if (method->NoStreaming()) { printer->Print( *vars, "MOCK_METHOD3($Method$, ::grpc::Status(::grpc::ClientContext* context, " "const $Request$& request, $Response$* response));\n"); - printer->Print(*vars, - "MOCK_METHOD3(Async$Method$Raw, " - "::grpc::ClientAsyncResponseReaderInterface< $Response$>*" - "(::grpc::ClientContext* context, const $Request$& request, " - "::grpc::CompletionQueue* cq));\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + printer->Print( + *vars, + "MOCK_METHOD3($AsyncPrefix$$Method$Raw, " + "::grpc::ClientAsyncResponseReaderInterface< $Response$>*" + "(::grpc::ClientContext* context, const $Request$& request, " + "::grpc::CompletionQueue* cq));\n"); + } } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, "MOCK_METHOD2($Method$Raw, " "::grpc::ClientWriterInterface< $Request$>*" "(::grpc::ClientContext* context, $Response$* response));\n"); - printer->Print(*vars, - "MOCK_METHOD4(Async$Method$Raw, " - "::grpc::ClientAsyncWriterInterface< $Request$>*" - "(::grpc::ClientContext* context, $Response$* response, " - "::grpc::CompletionQueue* cq, void* tag));\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["MockArgs"] = + std::to_string(3 + async_prefix.extra_method_param_count); + printer->Print(*vars, + "MOCK_METHOD$MockArgs$($AsyncPrefix$$Method$Raw, " + "::grpc::ClientAsyncWriterInterface< $Request$>*" + "(::grpc::ClientContext* context, $Response$* response, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$));\n"); + } } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, "MOCK_METHOD2($Method$Raw, " "::grpc::ClientReaderInterface< $Response$>*" "(::grpc::ClientContext* context, const $Request$& request));\n"); - printer->Print(*vars, - "MOCK_METHOD4(Async$Method$Raw, " - "::grpc::ClientAsyncReaderInterface< $Response$>*" - "(::grpc::ClientContext* context, const $Request$& request, " - "::grpc::CompletionQueue* cq, void* tag));\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["MockArgs"] = + std::to_string(3 + async_prefix.extra_method_param_count); + printer->Print( + *vars, + "MOCK_METHOD$MockArgs$($AsyncPrefix$$Method$Raw, " + "::grpc::ClientAsyncReaderInterface< $Response$>*" + "(::grpc::ClientContext* context, const $Request$& request, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$));\n"); + } } else if (method->BidiStreaming()) { printer->Print( *vars, "MOCK_METHOD1($Method$Raw, " "::grpc::ClientReaderWriterInterface< $Request$, $Response$>*" "(::grpc::ClientContext* context));\n"); - printer->Print( - *vars, - "MOCK_METHOD3(Async$Method$Raw, " - "::grpc::ClientAsyncReaderWriterInterface<$Request$, $Response$>*" - "(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, " - "void* tag));\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["MockArgs"] = + std::to_string(2 + async_prefix.extra_method_param_count); + printer->Print( + *vars, + "MOCK_METHOD$MockArgs$($AsyncPrefix$$Method$Raw, " + "::grpc::ClientAsyncReaderWriterInterface<$Request$, $Response$>*" + "(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq" + "$AsyncMethodParams$));\n"); + } } } diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index 66b1ef0e39..de2e449fe8 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -22,14 +22,29 @@ namespace grpc { +namespace { +std::unique_ptr<GenericClientAsyncReaderWriter> CallInternal( + ChannelInterface* channel, ClientContext* context, + const grpc::string& method, CompletionQueue* cq, bool start, void* tag) { + return std::unique_ptr<GenericClientAsyncReaderWriter>( + GenericClientAsyncReaderWriter::Create( + channel, cq, RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), + context, start, tag)); +} + +} // namespace + // begin a call to a named method std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call( ClientContext* context, const grpc::string& method, CompletionQueue* cq, void* tag) { - return std::unique_ptr<GenericClientAsyncReaderWriter>( - GenericClientAsyncReaderWriter::Create( - channel_.get(), cq, - RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), context, tag)); + return CallInternal(channel_.get(), context, method, cq, true, tag); +} + +// setup a call to a named method +std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::PrepareCall( + ClientContext* context, const grpc::string& method, CompletionQueue* cq) { + return CallInternal(channel_.get(), context, method, cq, false, nullptr); } } // namespace grpc diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index b43c27f3f7..3d664e8825 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -65,6 +65,9 @@ class ServiceA final { std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>> AsyncMethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>>(AsyncMethodA1Raw(context, request, cq)); } + std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>> PrepareAsyncMethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>>(PrepareAsyncMethodA1Raw(context, request, cq)); + } // MethodA1 trailing comment 1 // MethodA2 detached leading comment 1 // @@ -76,6 +79,9 @@ class ServiceA final { std::unique_ptr< ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>> AsyncMethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) { return std::unique_ptr< ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>>(AsyncMethodA2Raw(context, response, cq, tag)); } + std::unique_ptr< ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>> PrepareAsyncMethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>>(PrepareAsyncMethodA2Raw(context, response, cq)); + } // MethodA2 trailing comment 1 // Method A3 leading comment 1 std::unique_ptr< ::grpc::ClientReaderInterface< ::grpc::testing::Response>> MethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request) { @@ -84,6 +90,9 @@ class ServiceA final { std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>> AsyncMethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) { return std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>>(AsyncMethodA3Raw(context, request, cq, tag)); } + std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>> PrepareAsyncMethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>>(PrepareAsyncMethodA3Raw(context, request, cq)); + } // Method A3 trailing comment 1 // Method A4 leading comment 1 std::unique_ptr< ::grpc::ClientReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>> MethodA4(::grpc::ClientContext* context) { @@ -92,15 +101,22 @@ class ServiceA final { std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>> AsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) { return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>>(AsyncMethodA4Raw(context, cq, tag)); } + std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>> PrepareAsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>>(PrepareAsyncMethodA4Raw(context, cq)); + } // Method A4 trailing comment 1 private: virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* AsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; + virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* PrepareAsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; virtual ::grpc::ClientWriterInterface< ::grpc::testing::Request>* MethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response) = 0; virtual ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>* AsyncMethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) = 0; + virtual ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>* PrepareAsyncMethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq) = 0; virtual ::grpc::ClientReaderInterface< ::grpc::testing::Response>* MethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request) = 0; virtual ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>* AsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) = 0; + virtual ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>* PrepareAsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; virtual ::grpc::ClientReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4Raw(::grpc::ClientContext* context) = 0; virtual ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>* AsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) = 0; + virtual ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>* PrepareAsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) = 0; }; class Stub final : public StubInterface { public: @@ -109,34 +125,50 @@ class ServiceA final { std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> AsyncMethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(AsyncMethodA1Raw(context, request, cq)); } + std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> PrepareAsyncMethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(PrepareAsyncMethodA1Raw(context, request, cq)); + } std::unique_ptr< ::grpc::ClientWriter< ::grpc::testing::Request>> MethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response) { return std::unique_ptr< ::grpc::ClientWriter< ::grpc::testing::Request>>(MethodA2Raw(context, response)); } std::unique_ptr< ::grpc::ClientAsyncWriter< ::grpc::testing::Request>> AsyncMethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) { return std::unique_ptr< ::grpc::ClientAsyncWriter< ::grpc::testing::Request>>(AsyncMethodA2Raw(context, response, cq, tag)); } + std::unique_ptr< ::grpc::ClientAsyncWriter< ::grpc::testing::Request>> PrepareAsyncMethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncWriter< ::grpc::testing::Request>>(PrepareAsyncMethodA2Raw(context, response, cq)); + } std::unique_ptr< ::grpc::ClientReader< ::grpc::testing::Response>> MethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request) { return std::unique_ptr< ::grpc::ClientReader< ::grpc::testing::Response>>(MethodA3Raw(context, request)); } std::unique_ptr< ::grpc::ClientAsyncReader< ::grpc::testing::Response>> AsyncMethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) { return std::unique_ptr< ::grpc::ClientAsyncReader< ::grpc::testing::Response>>(AsyncMethodA3Raw(context, request, cq, tag)); } + std::unique_ptr< ::grpc::ClientAsyncReader< ::grpc::testing::Response>> PrepareAsyncMethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncReader< ::grpc::testing::Response>>(PrepareAsyncMethodA3Raw(context, request, cq)); + } std::unique_ptr< ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>> MethodA4(::grpc::ClientContext* context) { return std::unique_ptr< ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>>(MethodA4Raw(context)); } std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>> AsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) { return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>>(AsyncMethodA4Raw(context, cq, tag)); } + std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>> PrepareAsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>>(PrepareAsyncMethodA4Raw(context, cq)); + } private: std::shared_ptr< ::grpc::ChannelInterface> channel_; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; + ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* PrepareAsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; ::grpc::ClientWriter< ::grpc::testing::Request>* MethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response) override; ::grpc::ClientAsyncWriter< ::grpc::testing::Request>* AsyncMethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) override; + ::grpc::ClientAsyncWriter< ::grpc::testing::Request>* PrepareAsyncMethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq) override; ::grpc::ClientReader< ::grpc::testing::Response>* MethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request) override; ::grpc::ClientAsyncReader< ::grpc::testing::Response>* AsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) override; + ::grpc::ClientAsyncReader< ::grpc::testing::Response>* PrepareAsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4Raw(::grpc::ClientContext* context) override; ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* AsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) override; + ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* PrepareAsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) override; const ::grpc::RpcMethod rpcmethod_MethodA1_; const ::grpc::RpcMethod rpcmethod_MethodA2_; const ::grpc::RpcMethod rpcmethod_MethodA3_; @@ -372,9 +404,13 @@ class ServiceB final { std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>> AsyncMethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>>(AsyncMethodB1Raw(context, request, cq)); } + std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>> PrepareAsyncMethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>>(PrepareAsyncMethodB1Raw(context, request, cq)); + } // MethodB1 trailing comment 1 private: virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; + virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* PrepareAsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; }; class Stub final : public StubInterface { public: @@ -383,10 +419,14 @@ class ServiceB final { std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> AsyncMethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(AsyncMethodB1Raw(context, request, cq)); } + std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> PrepareAsyncMethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(PrepareAsyncMethodB1Raw(context, request, cq)); + } private: std::shared_ptr< ::grpc::ChannelInterface> channel_; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; + ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* PrepareAsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; const ::grpc::RpcMethod rpcmethod_MethodB1_; }; static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions()); diff --git a/test/cpp/codegen/compiler_test_mock_golden b/test/cpp/codegen/compiler_test_mock_golden index 8e4b4d5911..f97c2dd83a 100644 --- a/test/cpp/codegen/compiler_test_mock_golden +++ b/test/cpp/codegen/compiler_test_mock_golden @@ -15,18 +15,23 @@ class MockServiceAStub : public ServiceA::StubInterface { public: MOCK_METHOD3(MethodA1, ::grpc::Status(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::testing::Response* response)); MOCK_METHOD3(AsyncMethodA1Raw, ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq)); + MOCK_METHOD3(PrepareAsyncMethodA1Raw, ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq)); MOCK_METHOD2(MethodA2Raw, ::grpc::ClientWriterInterface< ::grpc::testing::Request>*(::grpc::ClientContext* context, ::grpc::testing::Response* response)); MOCK_METHOD4(AsyncMethodA2Raw, ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>*(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag)); + MOCK_METHOD3(PrepareAsyncMethodA2Raw, ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>*(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq)); MOCK_METHOD2(MethodA3Raw, ::grpc::ClientReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request)); MOCK_METHOD4(AsyncMethodA3Raw, ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag)); + MOCK_METHOD3(PrepareAsyncMethodA3Raw, ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq)); MOCK_METHOD1(MethodA4Raw, ::grpc::ClientReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>*(::grpc::ClientContext* context)); MOCK_METHOD3(AsyncMethodA4Raw, ::grpc::ClientAsyncReaderWriterInterface<::grpc::testing::Request, ::grpc::testing::Response>*(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag)); + MOCK_METHOD2(PrepareAsyncMethodA4Raw, ::grpc::ClientAsyncReaderWriterInterface<::grpc::testing::Request, ::grpc::testing::Response>*(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq)); }; class MockServiceBStub : public ServiceB::StubInterface { public: MOCK_METHOD3(MethodB1, ::grpc::Status(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::testing::Response* response)); MOCK_METHOD3(AsyncMethodB1Raw, ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq)); + MOCK_METHOD3(PrepareAsyncMethodB1Raw, ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq)); }; } // namespace grpc diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 912c871482..1a4438047d 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -56,11 +56,6 @@ class ClientRpcContext { } virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0; - void lock() { mu_.lock(); } - void unlock() { mu_.unlock(); } - - private: - std::mutex mu_; }; template <class RequestType, class ResponseType> @@ -73,7 +68,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, CompletionQueue*)> - start_req, + prepare_req, std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> on_done) : context_(), stub_(stub), @@ -83,7 +78,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { next_state_(State::READY), callback_(on_done), next_issue_(next_issue), - start_req_(start_req) {} + prepare_req_(prepare_req) {} ~ClientRpcContextUnaryImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { StartInternal(cq); @@ -92,7 +87,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { switch (next_state_) { case State::READY: start_ = UsageTimer::Now(); - response_reader_ = start_req_(stub_, &context_, req_, cq_); + response_reader_ = prepare_req_(stub_, &context_, req_, cq_); + response_reader_->StartCall(); next_state_ = State::RESP_DONE; response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); @@ -111,8 +107,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { } void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_, - start_req_, callback_); - std::lock_guard<ClientRpcContext> lclone(*clone); + prepare_req_, callback_); clone->StartInternal(cq); } @@ -130,7 +125,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, CompletionQueue*)> - start_req_; + prepare_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> @@ -252,29 +247,13 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { // this thread isn't supposed to shut down std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); if (shutdown_state_[thread_idx]->shutdown) { - // We want to delete the context. However, it is possible that - // another thread that just initiated an action on this - // context still has its lock even though the action on the - // context has completed. To delay for that, just grab the - // lock for serialization. Take a new scope. - { std::lock_guard<ClientRpcContext> lctx(*ctx); } delete ctx; return true; } - bool del = false; - - // Create a new scope for a lock_guard'ed region - { - std::lock_guard<ClientRpcContext> lctx(*ctx); - if (!ctx->RunNextState(ok, entry)) { - // The RPC and callback are done, so clone the ctx - // and kickstart the new one - ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); - // set the old version to delete - del = true; - } - } - if (del) { + if (!ctx->RunNextState(ok, entry)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); delete ctx; } return true; @@ -311,15 +290,15 @@ class AsyncUnaryClient final entry->set_status(s.error_code()); } static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>> - StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx, - const SimpleRequest& request, CompletionQueue* cq) { - return stub->AsyncUnaryCall(ctx, request, cq); + PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx, + const SimpleRequest& request, CompletionQueue* cq) { + return stub->PrepareAsyncUnaryCall(ctx, request, cq); }; static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, std::function<gpr_timespec()> next_issue, const SimpleRequest& req) { return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( - stub, req, next_issue, AsyncUnaryClient::StartReq, + stub, req, next_issue, AsyncUnaryClient::PrepareReq, AsyncUnaryClient::CheckDone); } }; @@ -332,9 +311,8 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { std::function<gpr_timespec()> next_issue, std::function<std::unique_ptr< grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( - BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, - void*)> - start_req, + BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)> + prepare_req, std::function<void(grpc::Status, ResponseType*)> on_done) : context_(), stub_(stub), @@ -344,7 +322,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { next_state_(State::INVALID), callback_(on_done), next_issue_(next_issue), - start_req_(start_req) {} + prepare_req_(prepare_req) {} ~ClientRpcContextStreamingPingPongImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { StartInternal(cq, config.messages_per_stream()); @@ -407,8 +385,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { } void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextStreamingPingPongImpl( - stub_, req_, next_issue_, start_req_, callback_); - std::lock_guard<ClientRpcContext> lclone(*clone); + stub_, req_, next_issue_, prepare_req_, callback_); clone->StartInternal(cq, messages_per_stream_); } @@ -432,10 +409,10 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { State next_state_; std::function<void(grpc::Status, ResponseType*)> callback_; std::function<gpr_timespec()> next_issue_; - std::function<std::unique_ptr< - grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( - BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)> - start_req_; + std::function< + std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( + BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)> + prepare_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>> @@ -449,8 +426,9 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { cq_ = cq; messages_per_stream_ = messages_per_stream; messages_issued_ = 0; + stream_ = prepare_req_(stub_, &context_, cq); next_state_ = State::STREAM_IDLE; - stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); + stream_->StartCall(ClientRpcContext::tag(this)); } }; @@ -469,9 +447,9 @@ class AsyncStreamingPingPongClient final static void CheckDone(grpc::Status s, SimpleResponse* response) {} static std::unique_ptr< grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>> - StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx, - CompletionQueue* cq, void* tag) { - auto stream = stub->AsyncStreamingCall(ctx, cq, tag); + PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx, + CompletionQueue* cq) { + auto stream = stub->PrepareAsyncStreamingCall(ctx, cq); return stream; }; static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, @@ -479,7 +457,7 @@ class AsyncStreamingPingPongClient final const SimpleRequest& req) { return new ClientRpcContextStreamingPingPongImpl<SimpleRequest, SimpleResponse>( - stub, req, next_issue, AsyncStreamingPingPongClient::StartReq, + stub, req, next_issue, AsyncStreamingPingPongClient::PrepareReq, AsyncStreamingPingPongClient::CheckDone); } }; @@ -492,8 +470,8 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext { std::function<gpr_timespec()> next_issue, std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>( BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*, - CompletionQueue*, void*)> - start_req, + CompletionQueue*)> + prepare_req, std::function<void(grpc::Status, ResponseType*)> on_done) : context_(), stub_(stub), @@ -503,7 +481,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext { next_state_(State::INVALID), callback_(on_done), next_issue_(next_issue), - start_req_(start_req) {} + prepare_req_(prepare_req) {} ~ClientRpcContextStreamingFromClientImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { StartInternal(cq); @@ -546,8 +524,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext { } void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextStreamingFromClientImpl( - stub_, req_, next_issue_, start_req_, callback_); - std::lock_guard<ClientRpcContext> lclone(*clone); + stub_, req_, next_issue_, prepare_req_, callback_); clone->StartInternal(cq); } @@ -570,17 +547,17 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext { std::function<gpr_timespec()> next_issue_; std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>( BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*, - CompletionQueue*, void*)> - start_req_; + CompletionQueue*)> + prepare_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> stream_; void StartInternal(CompletionQueue* cq) { cq_ = cq; - stream_ = start_req_(stub_, &context_, &response_, cq, - ClientRpcContext::tag(this)); + stream_ = prepare_req_(stub_, &context_, &response_, cq); next_state_ = State::STREAM_IDLE; + stream_->StartCall(ClientRpcContext::tag(this)); } }; @@ -597,10 +574,10 @@ class AsyncStreamingFromClientClient final private: static void CheckDone(grpc::Status s, SimpleResponse* response) {} - static std::unique_ptr<grpc::ClientAsyncWriter<SimpleRequest>> StartReq( + static std::unique_ptr<grpc::ClientAsyncWriter<SimpleRequest>> PrepareReq( BenchmarkService::Stub* stub, grpc::ClientContext* ctx, - SimpleResponse* resp, CompletionQueue* cq, void* tag) { - auto stream = stub->AsyncStreamingFromClient(ctx, resp, cq, tag); + SimpleResponse* resp, CompletionQueue* cq) { + auto stream = stub->PrepareAsyncStreamingFromClient(ctx, resp, cq); return stream; }; static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, @@ -608,7 +585,7 @@ class AsyncStreamingFromClientClient final const SimpleRequest& req) { return new ClientRpcContextStreamingFromClientImpl<SimpleRequest, SimpleResponse>( - stub, req, next_issue, AsyncStreamingFromClientClient::StartReq, + stub, req, next_issue, AsyncStreamingFromClientClient::PrepareReq, AsyncStreamingFromClientClient::CheckDone); } }; @@ -621,8 +598,8 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext { std::function<gpr_timespec()> next_issue, std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*, void*)> - start_req, + CompletionQueue*)> + prepare_req, std::function<void(grpc::Status, ResponseType*)> on_done) : context_(), stub_(stub), @@ -632,7 +609,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext { next_state_(State::INVALID), callback_(on_done), next_issue_(next_issue), - start_req_(start_req) {} + prepare_req_(prepare_req) {} ~ClientRpcContextStreamingFromServerImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { StartInternal(cq); @@ -664,8 +641,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext { } void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextStreamingFromServerImpl( - stub_, req_, next_issue_, start_req_, callback_); - std::lock_guard<ClientRpcContext> lclone(*clone); + stub_, req_, next_issue_, prepare_req_, callback_); clone->StartInternal(cq); } @@ -682,8 +658,8 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext { std::function<gpr_timespec()> next_issue_; std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*, void*)> - start_req_; + CompletionQueue*)> + prepare_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> stream_; @@ -691,9 +667,9 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext { void StartInternal(CompletionQueue* cq) { // TODO(vjpai): Add support to rate-pace this cq_ = cq; + stream_ = prepare_req_(stub_, &context_, req_, cq); next_state_ = State::STREAM_IDLE; - stream_ = - start_req_(stub_, &context_, req_, cq, ClientRpcContext::tag(this)); + stream_->StartCall(ClientRpcContext::tag(this)); } }; @@ -710,10 +686,10 @@ class AsyncStreamingFromServerClient final private: static void CheckDone(grpc::Status s, SimpleResponse* response) {} - static std::unique_ptr<grpc::ClientAsyncReader<SimpleResponse>> StartReq( + static std::unique_ptr<grpc::ClientAsyncReader<SimpleResponse>> PrepareReq( BenchmarkService::Stub* stub, grpc::ClientContext* ctx, - const SimpleRequest& req, CompletionQueue* cq, void* tag) { - auto stream = stub->AsyncStreamingFromServer(ctx, req, cq, tag); + const SimpleRequest& req, CompletionQueue* cq) { + auto stream = stub->PrepareAsyncStreamingFromServer(ctx, req, cq); return stream; }; static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, @@ -721,7 +697,7 @@ class AsyncStreamingFromServerClient final const SimpleRequest& req) { return new ClientRpcContextStreamingFromServerImpl<SimpleRequest, SimpleResponse>( - stub, req, next_issue, AsyncStreamingFromServerClient::StartReq, + stub, req, next_issue, AsyncStreamingFromServerClient::PrepareReq, AsyncStreamingFromServerClient::CheckDone); } }; @@ -733,8 +709,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { std::function<gpr_timespec()> next_issue, std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( grpc::GenericStub*, grpc::ClientContext*, - const grpc::string& method_name, CompletionQueue*, void*)> - start_req, + const grpc::string& method_name, CompletionQueue*)> + prepare_req, std::function<void(grpc::Status, ByteBuffer*)> on_done) : context_(), stub_(stub), @@ -744,7 +720,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { next_state_(State::INVALID), callback_(on_done), next_issue_(next_issue), - start_req_(start_req) {} + prepare_req_(prepare_req) {} ~ClientRpcContextGenericStreamingImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { StartInternal(cq, config.messages_per_stream()); @@ -807,8 +783,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { } void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextGenericStreamingImpl( - stub_, req_, next_issue_, start_req_, callback_); - std::lock_guard<ClientRpcContext> lclone(*clone); + stub_, req_, next_issue_, prepare_req_, callback_); clone->StartInternal(cq, messages_per_stream_); } @@ -834,8 +809,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { std::function<gpr_timespec()> next_issue_; std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, - CompletionQueue*, void*)> - start_req_; + CompletionQueue*)> + prepare_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_; @@ -850,9 +825,9 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { "/grpc.testing.BenchmarkService/StreamingCall"); messages_per_stream_ = messages_per_stream; messages_issued_ = 0; + stream_ = prepare_req_(stub_, &context_, kMethodName, cq); next_state_ = State::STREAM_IDLE; - stream_ = start_req_(stub_, &context_, kMethodName, cq, - ClientRpcContext::tag(this)); + stream_->StartCall(ClientRpcContext::tag(this)); } }; @@ -874,17 +849,17 @@ class GenericAsyncStreamingClient final private: static void CheckDone(grpc::Status s, ByteBuffer* response) {} - static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> StartReq( + static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> PrepareReq( grpc::GenericStub* stub, grpc::ClientContext* ctx, - const grpc::string& method_name, CompletionQueue* cq, void* tag) { - auto stream = stub->Call(ctx, method_name, cq, tag); + const grpc::string& method_name, CompletionQueue* cq) { + auto stream = stub->PrepareCall(ctx, method_name, cq); return stream; }; static ClientRpcContext* SetupCtx(grpc::GenericStub* stub, std::function<gpr_timespec()> next_issue, const ByteBuffer& req) { return new ClientRpcContextGenericStreamingImpl( - stub, req, next_issue, GenericAsyncStreamingClient::StartReq, + stub, req, next_issue, GenericAsyncStreamingClient::PrepareReq, GenericAsyncStreamingClient::CheckDone); } }; |