diff options
author | 2018-08-21 14:32:13 -0700 | |
---|---|---|
committer | 2018-08-30 15:46:27 -0700 | |
commit | 8bf52535d1008a3f38e70d1387846f0e942761b5 (patch) | |
tree | 4d162e97b5917cdd9373f55a02b34284e4354e55 /include/grpcpp | |
parent | d766b9a2a40fd3002d9b93c97525a868c9d92dc4 (diff) |
Provide a generic client-side unary callback API
Diffstat (limited to 'include/grpcpp')
-rw-r--r-- | include/grpcpp/channel.h | 5 | ||||
-rw-r--r-- | include/grpcpp/generic/generic_stub.h | 23 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/call.h | 10 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/callback_common.h | 79 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/channel_interface.h | 14 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/client_callback.h | 92 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/client_context.h | 4 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/completion_queue.h | 3 | ||||
-rw-r--r-- | include/grpcpp/support/client_callback.h | 24 |
9 files changed, 253 insertions, 1 deletions
diff --git a/include/grpcpp/channel.h b/include/grpcpp/channel.h index fed02bf7bc..58a6f51664 100644 --- a/include/grpcpp/channel.h +++ b/include/grpcpp/channel.h @@ -78,8 +78,13 @@ class Channel final : public ChannelInterface, bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline) override; + CompletionQueue* CallbackCQ() override; + const grpc::string host_; grpc_channel* const c_channel_; // owned + + CompletionQueue* callback_cq_ = nullptr; + std::mutex mu_; }; } // namespace grpc diff --git a/include/grpcpp/generic/generic_stub.h b/include/grpcpp/generic/generic_stub.h index 92405a43fa..d509d9a520 100644 --- a/include/grpcpp/generic/generic_stub.h +++ b/include/grpcpp/generic/generic_stub.h @@ -19,9 +19,12 @@ #ifndef GRPCPP_GENERIC_GENERIC_STUB_H #define GRPCPP_GENERIC_GENERIC_STUB_H +#include <functional> + #include <grpcpp/support/async_stream.h> #include <grpcpp/support/async_unary_call.h> #include <grpcpp/support/byte_buffer.h> +#include <grpcpp/support/status.h> namespace grpc { @@ -62,6 +65,26 @@ class GenericStub final { ClientContext* context, const grpc::string& method, CompletionQueue* cq, void* tag); + /// NOTE: class experimental_type is not part of the public API of this class + /// TODO(vjpai): Move these contents to the public API of GenericStub when + /// they are no longer experimental + class experimental_type { + public: + explicit experimental_type(GenericStub* stub) : stub_(stub) {} + + void UnaryCall(ClientContext* context, const grpc::string& method, + const ByteBuffer* request, ByteBuffer* response, + std::function<void(Status)> on_completion); + + private: + GenericStub* stub_; + }; + + /// NOTE: The function experimental() is not stable public API. It is a view + /// to the experimental components of this class. It may be changed or removed + /// at any time. + experimental_type experimental() { return experimental_type(this); } + private: std::shared_ptr<ChannelInterface> channel_; }; diff --git a/include/grpcpp/impl/codegen/call.h b/include/grpcpp/impl/codegen/call.h index a5e930aaa5..7bd03b6b1b 100644 --- a/include/grpcpp/impl/codegen/call.h +++ b/include/grpcpp/impl/codegen/call.h @@ -608,6 +608,9 @@ class CallOpSetInterface : public CompletionQueueTag { /// Fills in grpc_op, starting from ops[*nops] and moving /// upwards. virtual void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) = 0; + + /// Get the tag to be used at the CQ + virtual void* cq_tag() = 0; }; /// Primary implementation of CallOpSetInterface. @@ -627,7 +630,7 @@ class CallOpSet : public CallOpSetInterface, public Op5, public Op6 { public: - CallOpSet() : return_tag_(this), call_(nullptr) {} + CallOpSet() : cq_tag_(this), return_tag_(this), call_(nullptr) {} void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override { this->Op1::AddOp(ops, nops); this->Op2::AddOp(ops, nops); @@ -654,7 +657,12 @@ class CallOpSet : public CallOpSetInterface, void set_output_tag(void* return_tag) { return_tag_ = return_tag; } + void* cq_tag() override { return cq_tag_; } + + void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; } + private: + void* cq_tag_; void* return_tag_; grpc_call* call_; }; diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h new file mode 100644 index 0000000000..5e1530a310 --- /dev/null +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -0,0 +1,79 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CODEGEN_CALLBACK_COMMON_H +#define GRPCPP_IMPL_CODEGEN_CALLBACK_COMMON_H + +#include <functional> + +#include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/channel_interface.h> +#include <grpcpp/impl/codegen/config.h> +#include <grpcpp/impl/codegen/core_codegen_interface.h> +#include <grpcpp/impl/codegen/status.h> + +// Forward declarations +namespace grpc_core { +class CQCallbackInterface; +}; + +namespace grpc { +namespace internal { + +class CallbackWithStatusTag { + public: + // TODO(vjpai): make impl and ops part of this structure to avoid allocation, + // ownership transfer, and delete + CallbackWithStatusTag(std::function<void(Status)> f, bool self_delete, + CompletionQueueTag* ops); + ~CallbackWithStatusTag() { delete ops_; } + void* tag() { return static_cast<void*>(impl_); } + Status* status_ptr() { return status_; } + CompletionQueueTag* ops() { return ops_; } + + // force_run can only be performed on a tag before it can ever be active + void force_run(Status s); + + private: + grpc_core::CQCallbackInterface* impl_; + Status* status_; + CompletionQueueTag* ops_; +}; + +class CallbackWithSuccessTag { + public: + // TODO(vjpai): make impl and ops part of this structure to avoid allocation, + // ownership transfer, and delete + CallbackWithSuccessTag(std::function<void(bool)> f, bool self_delete, + CompletionQueueTag* ops); + ~CallbackWithSuccessTag() { delete ops_; } + void* tag() { return static_cast<void*>(impl_); } + CompletionQueueTag* ops() { return ops_; } + + // force_run can only be performed on a tag before it can ever be active + void force_run(bool ok); + + private: + grpc_core::CQCallbackInterface* impl_; + CompletionQueueTag* ops_; +}; + +} // namespace internal +} // namespace grpc + +#endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H diff --git a/include/grpcpp/impl/codegen/channel_interface.h b/include/grpcpp/impl/codegen/channel_interface.h index ec1c6c25d8..b257acc1ab 100644 --- a/include/grpcpp/impl/codegen/channel_interface.h +++ b/include/grpcpp/impl/codegen/channel_interface.h @@ -41,6 +41,8 @@ class CallOpSetInterface; class RpcMethod; template <class InputMessage, class OutputMessage> class BlockingUnaryCallImpl; +template <class InputMessage, class OutputMessage> +class CallbackUnaryCallImpl; template <class R> class ClientAsyncReaderFactory; template <class W> @@ -103,6 +105,8 @@ class ChannelInterface { friend class ::grpc::internal::ClientAsyncResponseReaderFactory; template <class InputMessage, class OutputMessage> friend class ::grpc::internal::BlockingUnaryCallImpl; + template <class InputMessage, class OutputMessage> + friend class ::grpc::internal::CallbackUnaryCallImpl; friend class ::grpc::internal::RpcMethod; virtual internal::Call CreateCall(const internal::RpcMethod& method, ClientContext* context, @@ -115,6 +119,16 @@ class ChannelInterface { CompletionQueue* cq, void* tag) = 0; virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline) = 0; + + // EXPERIMENTAL + // A method to get the callbackable completion queue associated with this + // channel. If the return value is nullptr, this channel doesn't support + // callback operations. + // TODO(vjpai): Consider a better default like using a global CQ + // Returns nullptr (rather than being pure) since this is a new method + // and adding a new pure method to an interface would be a breaking change + // (even though this is private and non-API) + virtual CompletionQueue* CallbackCQ() { return nullptr; } }; } // namespace grpc diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h new file mode 100644 index 0000000000..419933f85c --- /dev/null +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -0,0 +1,92 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H +#define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H + +#include <functional> + +#include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/callback_common.h> +#include <grpcpp/impl/codegen/channel_interface.h> +#include <grpcpp/impl/codegen/config.h> +#include <grpcpp/impl/codegen/core_codegen_interface.h> +#include <grpcpp/impl/codegen/status.h> + +namespace grpc { + +class Channel; +class ClientContext; +class CompletionQueue; + +namespace internal { +class RpcMethod; + +/// Perform a callback-based unary call +/// TODO(vjpai): Combine as much as possible with the blocking unary call code +template <class InputMessage, class OutputMessage> +void CallbackUnaryCall(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const InputMessage* request, + OutputMessage* result, + std::function<void(Status)> on_completion) { + CallbackUnaryCallImpl<InputMessage, OutputMessage> x( + channel, method, context, request, result, on_completion); +} + +template <class InputMessage, class OutputMessage> +class CallbackUnaryCallImpl { + public: + CallbackUnaryCallImpl(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const InputMessage* request, + OutputMessage* result, + std::function<void(Status)> on_completion) { + CompletionQueue* cq = channel->CallbackCQ(); + GPR_CODEGEN_ASSERT(cq != nullptr); + + // TODO(vjpai): Allocate this as part of the tag's arena + auto* ops = new CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpRecvInitialMetadata, + CallOpRecvMessage<OutputMessage>, + CallOpClientSendClose, CallOpClientRecvStatus>; + + // TODO(vjpai): Move to using pre-allocated tags rather than new/self-delete + auto* tag = new CallbackWithStatusTag(on_completion, true, ops); + + // TODO(vjpai): Unify code with sync API as much as possible + Call call(channel->CreateCall(method, context, cq)); + Status s = ops->SendMessage(*request); + if (!s.ok()) { + tag->force_run(s); + return; + } + ops->SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + ops->RecvInitialMetadata(context); + ops->RecvMessage(result); + ops->AllowNoMessage(); + ops->ClientSendClose(); + ops->ClientRecvStatus(context, tag->status_ptr()); + ops->set_cq_tag(tag->tag()); + call.PerformOps(ops); + } +}; + +} // namespace internal +} // namespace grpc + +#endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H diff --git a/include/grpcpp/impl/codegen/client_context.h b/include/grpcpp/impl/codegen/client_context.h index 9dda4c7fac..7db31fcbcf 100644 --- a/include/grpcpp/impl/codegen/client_context.h +++ b/include/grpcpp/impl/codegen/client_context.h @@ -68,6 +68,8 @@ class CallOpClientRecvStatus; class CallOpRecvInitialMetadata; template <class InputMessage, class OutputMessage> class BlockingUnaryCallImpl; +template <class InputMessage, class OutputMessage> +class CallbackUnaryCallImpl; } // namespace internal template <class R> @@ -389,6 +391,8 @@ class ClientContext { friend class ::grpc::ClientAsyncResponseReader; template <class InputMessage, class OutputMessage> friend class ::grpc::internal::BlockingUnaryCallImpl; + template <class InputMessage, class OutputMessage> + friend class ::grpc::internal::CallbackUnaryCallImpl; // Used by friend class CallOpClientRecvStatus void set_debug_error_string(const grpc::string& debug_error_string) { diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h index 3f7d4fb765..f52f9a53be 100644 --- a/include/grpcpp/impl/codegen/completion_queue.h +++ b/include/grpcpp/impl/codegen/completion_queue.h @@ -274,6 +274,9 @@ class CompletionQueue : private GrpcLibraryCodegen { template <class InputMessage, class OutputMessage> friend class ::grpc::internal::BlockingUnaryCallImpl; + // Friends that need access to constructor for callback CQ + friend class ::grpc::Channel; + /// EXPERIMENTAL /// Creates a Thread Local cache to store the first event /// On this completion queue queued from this thread. Once diff --git a/include/grpcpp/support/client_callback.h b/include/grpcpp/support/client_callback.h new file mode 100644 index 0000000000..03a7e1400c --- /dev/null +++ b/include/grpcpp/support/client_callback.h @@ -0,0 +1,24 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_SUPPORT_CLIENT_CALLBACK_H +#define GRPCPP_SUPPORT_CLIENT_CALLBACK_H + +#include <grpcpp/impl/codegen/client_callback.h> + +#endif // GRPCPP_SUPPORT_CLIENT_CALLBACK_H |