diff options
Diffstat (limited to 'include/grpcpp/impl')
19 files changed, 675 insertions, 76 deletions
diff --git a/include/grpcpp/impl/codegen/async_generic_service.h b/include/grpcpp/impl/codegen/async_generic_service.h new file mode 100644 index 0000000000..2a0e1b4088 --- /dev/null +++ b/include/grpcpp/impl/codegen/async_generic_service.h @@ -0,0 +1,81 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CODEGEN_ASYNC_GENERIC_SERVICE_H +#define GRPCPP_IMPL_CODEGEN_ASYNC_GENERIC_SERVICE_H + +#include <grpcpp/impl/codegen/async_stream.h> +#include <grpcpp/impl/codegen/byte_buffer.h> + +struct grpc_server; + +namespace grpc { + +typedef ServerAsyncReaderWriter<ByteBuffer, ByteBuffer> + GenericServerAsyncReaderWriter; +typedef ServerAsyncResponseWriter<ByteBuffer> GenericServerAsyncResponseWriter; +typedef ServerAsyncReader<ByteBuffer, ByteBuffer> GenericServerAsyncReader; +typedef ServerAsyncWriter<ByteBuffer> GenericServerAsyncWriter; + +class GenericServerContext final : public ServerContext { + public: + const grpc::string& method() const { return method_; } + const grpc::string& host() const { return host_; } + + private: + friend class Server; + friend class ServerInterface; + + grpc::string method_; + grpc::string host_; +}; + +// A generic service at the server side accepts all RPC methods and hosts. It is +// typically used in proxies. The generic service can be registered to a server +// which also has other services. +// Sample usage: +// ServerBuilder builder; +// auto cq = builder.AddCompletionQueue(); +// AsyncGenericService generic_service; +// builder.RegisterAsyncGenericService(&generic_service); +// auto server = builder.BuildAndStart(); +// +// // request a new call +// GenericServerContext context; +// GenericServerAsyncReaderWriter stream; +// generic_service.RequestCall(&context, &stream, cq.get(), cq.get(), tag); +// +// When tag is retrieved from cq->Next(), context.method() can be used to look +// at the method and the RPC can be handled accordingly. +class AsyncGenericService final { + public: + AsyncGenericService() : server_(nullptr) {} + + void RequestCall(GenericServerContext* ctx, + GenericServerAsyncReaderWriter* reader_writer, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag); + + private: + friend class Server; + Server* server_; +}; + +} // namespace grpc + +#endif // GRPCPP_IMPL_CODEGEN_ASYNC_GENERIC_SERVICE_H diff --git a/include/grpcpp/impl/codegen/async_stream.h b/include/grpcpp/impl/codegen/async_stream.h index b2134590c3..6e58fd0eef 100644 --- a/include/grpcpp/impl/codegen/async_stream.h +++ b/include/grpcpp/impl/codegen/async_stream.h @@ -64,7 +64,7 @@ class ClientAsyncStreamingInterface { /// earlier call to \a AsyncReaderInterface::Read that yielded a failed /// result, e.g. cq->Next(&read_tag, &ok) filled in 'ok' with 'false'). /// - /// This function will return when either: + /// The tag will be returned when either: /// - all incoming messages have been read and the server has returned /// a status. /// - the server has returned a non-OK status. @@ -114,6 +114,9 @@ class AsyncWriterInterface { /// queue BEFORE calling Write again. /// This is thread-safe with respect to \a AsyncReaderInterface::Read /// + /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to + /// to deallocate once Write returns. + /// /// \param[in] msg The message to be written. /// \param[in] tag The tag identifying the operation. virtual void Write(const W& msg, void* tag) = 0; @@ -127,6 +130,9 @@ class AsyncWriterInterface { /// WriteOptions \a options is used to set the write options of this message. /// This is thread-safe with respect to \a AsyncReaderInterface::Read /// + /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to + /// to deallocate once Write returns. + /// /// \param[in] msg The message to be written. /// \param[in] options The WriteOptions to be used to write this message. /// \param[in] tag The tag identifying the operation. @@ -144,6 +150,9 @@ class AsyncWriterInterface { /// the flow control window size. If \a msg size is larger than the window /// size, it will be sent on wire without buffering. /// + /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to + /// to deallocate once Write returns. + /// /// \param[in] msg The message to be written. /// \param[in] options The WriteOptions to be used to write this message. /// \param[in] tag The tag identifying the operation. @@ -195,6 +204,13 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { assert(size == sizeof(ClientAsyncReader)); } + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + void StartCall(void* tag) override { assert(!started_); started_ = true; @@ -336,6 +352,13 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { assert(size == sizeof(ClientAsyncWriter)); } + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + void StartCall(void* tag) override { assert(!started_); started_ = true; @@ -496,6 +519,13 @@ class ClientAsyncReaderWriter final assert(size == sizeof(ClientAsyncReaderWriter)); } + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + void StartCall(void* tag) override { assert(!started_); started_ = true; @@ -630,6 +660,9 @@ class ServerAsyncReaderInterface /// metadata (if not sent already), response message, and status, or if /// some failure occurred when trying to do so. /// + /// gRPC doesn't take ownership or a reference to \a msg or \a status, so it + /// is safe to to deallocate once Finish returns. + /// /// \param[in] tag Tag identifying this request. /// \param[in] status To be sent to the client as the result of this call. /// \param[in] msg To be sent to the client as the response for this call. @@ -650,6 +683,9 @@ class ServerAsyncReaderInterface /// metadata (if not sent already), and status, or if some failure occurred /// when trying to do so. /// + /// gRPC doesn't take ownership or a reference to \a status, so it is safe to + /// to deallocate once FinishWithError returns. + /// /// \param[in] tag Tag identifying this request. /// \param[in] status To be sent to the client as the result of this call. /// - Note: \a status must have a non-OK code. @@ -697,6 +733,9 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> { /// initial and trailing metadata. /// /// Note: \a msg is not sent if \a status has a non-OK code. + /// + /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// is safe to to deallocate once Finish returns. void Finish(const W& msg, const Status& status, void* tag) override { finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { @@ -723,6 +762,9 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> { /// - also sends initial metadata if not alreay sent. /// - uses the \a ServerContext associated with this call to send possible /// initial and trailing metadata. + /// + /// gRPC doesn't take ownership or a reference to \a status, so it is safe to + /// to deallocate once FinishWithError returns. void FinishWithError(const Status& status, void* tag) override { GPR_CODEGEN_ASSERT(!status.ok()); finish_ops_.set_output_tag(tag); @@ -773,6 +815,9 @@ class ServerAsyncWriterInterface /// metadata (if not sent already), response message, and status, or if /// some failure occurred when trying to do so. /// + /// gRPC doesn't take ownership or a reference to \a status, so it is safe to + /// to deallocate once Finish returns. + /// /// \param[in] tag Tag identifying this request. /// \param[in] status To be sent to the client as the result of this call. virtual void Finish(const Status& status, void* tag) = 0; @@ -784,6 +829,9 @@ class ServerAsyncWriterInterface /// WriteAndFinish is equivalent of performing WriteLast and Finish /// in a single step. /// + /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// is safe to to deallocate once WriteAndFinish returns. + /// /// \param[in] msg The message to be written. /// \param[in] options The WriteOptions to be used to write this message. /// \param[in] status The Status that server returns to client. @@ -847,6 +895,9 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { /// for sending trailing (and initial) metadata to the client. /// /// Note: \a status must have an OK code. + /// + /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// is safe to to deallocate once WriteAndFinish returns. void WriteAndFinish(const W& msg, WriteOptions options, const Status& status, void* tag) override { write_ops_.set_output_tag(tag); @@ -865,6 +916,9 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { /// /// Note: there are no restrictions are the code of /// \a status,it may be non-OK + /// + /// gRPC doesn't take ownership or a reference to \a status, so it is safe to + /// to deallocate once Finish returns. void Finish(const Status& status, void* tag) override { finish_ops_.set_output_tag(tag); EnsureInitialMetadataSent(&finish_ops_); @@ -924,6 +978,9 @@ class ServerAsyncReaderWriterInterface /// metadata (if not sent already), response message, and status, or if some /// failure occurred when trying to do so. /// + /// gRPC doesn't take ownership or a reference to \a status, so it is safe to + /// to deallocate once Finish returns. + /// /// \param[in] tag Tag identifying this request. /// \param[in] status To be sent to the client as the result of this call. virtual void Finish(const Status& status, void* tag) = 0; @@ -935,6 +992,9 @@ class ServerAsyncReaderWriterInterface /// WriteAndFinish is equivalent of performing WriteLast and Finish in a /// single step. /// + /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// is safe to to deallocate once WriteAndFinish returns. + /// /// \param[in] msg The message to be written. /// \param[in] options The WriteOptions to be used to write this message. /// \param[in] status The Status that server returns to client. @@ -1006,6 +1066,9 @@ class ServerAsyncReaderWriter final /// for sending trailing (and initial) metadata to the client. /// /// Note: \a status must have an OK code. + // + /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// is safe to to deallocate once WriteAndFinish returns. void WriteAndFinish(const W& msg, WriteOptions options, const Status& status, void* tag) override { write_ops_.set_output_tag(tag); @@ -1024,6 +1087,9 @@ class ServerAsyncReaderWriter final /// /// Note: there are no restrictions are the code of \a status, /// it may be non-OK + // + /// gRPC doesn't take ownership or a reference to \a status, so it is safe to + /// to deallocate once Finish returns. void Finish(const Status& status, void* tag) override { finish_ops_.set_output_tag(tag); EnsureInitialMetadataSent(&finish_ops_); diff --git a/include/grpcpp/impl/codegen/byte_buffer.h b/include/grpcpp/impl/codegen/byte_buffer.h index 86c047ebe7..8cc5158115 100644 --- a/include/grpcpp/impl/codegen/byte_buffer.h +++ b/include/grpcpp/impl/codegen/byte_buffer.h @@ -45,6 +45,8 @@ template <class ServiceType, class RequestType, class ResponseType> class RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> class ServerStreamingHandler; +template <StatusCode code> +class ErrorMethodHandler; template <class R> class DeserializeFuncType; class GrpcByteBufferPeer; @@ -144,6 +146,8 @@ class ByteBuffer final { friend class internal::RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> friend class internal::ServerStreamingHandler; + template <StatusCode code> + friend class internal::ErrorMethodHandler; template <class R> friend class internal::DeserializeFuncType; friend class ProtoBufferReader; diff --git a/include/grpcpp/impl/codegen/call.h b/include/grpcpp/impl/codegen/call.h index 28cc4a9e20..7cadea0055 100644 --- a/include/grpcpp/impl/codegen/call.h +++ b/include/grpcpp/impl/codegen/call.h @@ -50,8 +50,6 @@ namespace internal { class Call; class CallHook; -const char kBinaryErrorDetailsKey[] = "grpc-status-details-bin"; - // TODO(yangg) if the map is changed before we send, the pointers will be a // mess. Make sure it does not happen. inline grpc_metadata* FillMetadataArray( @@ -171,8 +169,8 @@ class WriteOptions { return *this; } - /// Guarantee that all bytes have been written to the wire before completing - /// this write (usually writes are completed when they pass flow control) + /// Guarantee that all bytes have been written to the socket before completing + /// this write (usually writes are completed when they pass flow control). inline WriteOptions& set_write_through() { SetBit(GRPC_WRITE_THROUGH); return *this; @@ -531,7 +529,6 @@ class CallOpRecvInitialMetadata { void FinishOp(bool* status) { if (metadata_map_ == nullptr) return; - metadata_map_->FillMap(); metadata_map_ = nullptr; } @@ -566,17 +563,14 @@ class CallOpClientRecvStatus { void FinishOp(bool* status) { if (recv_status_ == nullptr) return; - metadata_map_->FillMap(); - grpc::string binary_error_details; - auto iter = metadata_map_->map()->find(kBinaryErrorDetailsKey); - if (iter != metadata_map_->map()->end()) { - binary_error_details = - grpc::string(iter->second.begin(), iter->second.length()); - } - *recv_status_ = Status(static_cast<StatusCode>(status_code_), - grpc::string(GRPC_SLICE_START_PTR(error_message_), - GRPC_SLICE_END_PTR(error_message_)), - binary_error_details); + grpc::string binary_error_details = metadata_map_->GetBinaryErrorDetails(); + *recv_status_ = + Status(static_cast<StatusCode>(status_code_), + GRPC_SLICE_IS_EMPTY(error_message_) + ? grpc::string() + : grpc::string(GRPC_SLICE_START_PTR(error_message_), + GRPC_SLICE_END_PTR(error_message_)), + binary_error_details); client_context_->set_debug_error_string( debug_error_string_ != nullptr ? debug_error_string_ : ""); g_core_codegen_interface->grpc_slice_unref(error_message_); @@ -605,6 +599,11 @@ class CallOpSetInterface : public CompletionQueueTag { /// Fills in grpc_op, starting from ops[*nops] and moving /// upwards. virtual void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) = 0; + + /// Get the tag to be used at the core completion queue. Generally, the + /// value of cq_tag will be "this". However, it can be overridden if we + /// want core to process the tag differently (e.g., as a core callback) + virtual void* cq_tag() = 0; }; /// Primary implementation of CallOpSetInterface. @@ -624,7 +623,7 @@ class CallOpSet : public CallOpSetInterface, public Op5, public Op6 { public: - CallOpSet() : return_tag_(this), call_(nullptr) {} + CallOpSet() : cq_tag_(this), return_tag_(this), call_(nullptr) {} void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override { this->Op1::AddOp(ops, nops); this->Op2::AddOp(ops, nops); @@ -651,7 +650,16 @@ class CallOpSet : public CallOpSetInterface, void set_output_tag(void* return_tag) { return_tag_ = return_tag; } + void* cq_tag() override { return cq_tag_; } + + /// set_cq_tag is used to provide a different core CQ tag than "this". + /// This is used for callback-based tags, where the core tag is the core + /// callback function. It does not change the use or behavior of any other + /// function (such as FinalizeResult) + void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; } + private: + void* cq_tag_; void* return_tag_; grpc_call* call_; }; diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h new file mode 100644 index 0000000000..ca2f867d04 --- /dev/null +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -0,0 +1,164 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CODEGEN_CALLBACK_COMMON_H +#define GRPCPP_IMPL_CODEGEN_CALLBACK_COMMON_H + +#include <functional> + +#include <grpc/impl/codegen/grpc_types.h> +#include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/channel_interface.h> +#include <grpcpp/impl/codegen/config.h> +#include <grpcpp/impl/codegen/core_codegen_interface.h> +#include <grpcpp/impl/codegen/status.h> + +namespace grpc { +namespace internal { + +/// An exception-safe way of invoking a user-specified callback function +template <class Func, class Arg> +void CatchingCallback(Func&& func, Arg&& arg) { +#if GRPC_ALLOW_EXCEPTIONS + try { + func(arg); + } catch (...) { + // nothing to return or change here, just don't crash the library + } +#else // GRPC_ALLOW_EXCEPTIONS + func(arg); +#endif // GRPC_ALLOW_EXCEPTIONS +} + +// The contract on these tags is that they are single-shot. They must be +// constructed and then fired at exactly one point. There is no expectation +// that they can be reused without reconstruction. + +class CallbackWithStatusTag + : public grpc_experimental_completion_queue_functor { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(CallbackWithStatusTag)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + + CallbackWithStatusTag(grpc_call* call, std::function<void(Status)> f, + CompletionQueueTag* ops) + : call_(call), func_(std::move(f)), ops_(ops) { + g_core_codegen_interface->grpc_call_ref(call); + functor_run = &CallbackWithStatusTag::StaticRun; + } + ~CallbackWithStatusTag() {} + Status* status_ptr() { return &status_; } + + // force_run can not be performed on a tag if operations using this tag + // have been sent to PerformOpsOnCall. It is intended for error conditions + // that are detected before the operations are internally processed. + void force_run(Status s) { + status_ = std::move(s); + Run(true); + } + + private: + grpc_call* call_; + std::function<void(Status)> func_; + CompletionQueueTag* ops_; + Status status_; + + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + static_cast<CallbackWithStatusTag*>(cb)->Run(static_cast<bool>(ok)); + } + void Run(bool ok) { + void* ignored = ops_; + + GPR_CODEGEN_ASSERT(ops_->FinalizeResult(&ignored, &ok)); + GPR_CODEGEN_ASSERT(ignored == ops_); + + // Last use of func_ or status_, so ok to move them out + CatchingCallback(std::move(func_), std::move(status_)); + + func_ = nullptr; // reset to clear this out for sure + status_ = Status(); // reset to clear this out for sure + g_core_codegen_interface->grpc_call_unref(call_); + } +}; + +class CallbackWithSuccessTag + : public grpc_experimental_completion_queue_functor { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(CallbackWithSuccessTag)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + + CallbackWithSuccessTag(grpc_call* call, std::function<void(bool)> f, + CompletionQueueTag* ops) + : call_(call), func_(std::move(f)), ops_(ops) { + g_core_codegen_interface->grpc_call_ref(call); + functor_run = &CallbackWithSuccessTag::StaticRun; + } + + CompletionQueueTag* ops() { return ops_; } + + // force_run can not be performed on a tag if operations using this tag + // have been sent to PerformOpsOnCall. It is intended for error conditions + // that are detected before the operations are internally processed. + void force_run(bool ok) { Run(ok); } + + private: + grpc_call* call_; + std::function<void(bool)> func_; + CompletionQueueTag* ops_; + + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + static_cast<CallbackWithSuccessTag*>(cb)->Run(static_cast<bool>(ok)); + } + void Run(bool ok) { + void* ignored = ops_; + bool new_ok = ok; + GPR_CODEGEN_ASSERT(ops_->FinalizeResult(&ignored, &new_ok)); + GPR_CODEGEN_ASSERT(ignored == ops_); + + // Last use of func_, so ok to move it out for rvalue call above + CatchingCallback(std::move(func_), ok); + + func_ = nullptr; // reset to clear this out for sure + g_core_codegen_interface->grpc_call_unref(call_); + } +}; + +} // namespace internal +} // namespace grpc + +#endif // GRPCPP_IMPL_CODEGEN_CALLBACK_COMMON_H diff --git a/include/grpcpp/impl/codegen/channel_interface.h b/include/grpcpp/impl/codegen/channel_interface.h index ec1c6c25d8..b257acc1ab 100644 --- a/include/grpcpp/impl/codegen/channel_interface.h +++ b/include/grpcpp/impl/codegen/channel_interface.h @@ -41,6 +41,8 @@ class CallOpSetInterface; class RpcMethod; template <class InputMessage, class OutputMessage> class BlockingUnaryCallImpl; +template <class InputMessage, class OutputMessage> +class CallbackUnaryCallImpl; template <class R> class ClientAsyncReaderFactory; template <class W> @@ -103,6 +105,8 @@ class ChannelInterface { friend class ::grpc::internal::ClientAsyncResponseReaderFactory; template <class InputMessage, class OutputMessage> friend class ::grpc::internal::BlockingUnaryCallImpl; + template <class InputMessage, class OutputMessage> + friend class ::grpc::internal::CallbackUnaryCallImpl; friend class ::grpc::internal::RpcMethod; virtual internal::Call CreateCall(const internal::RpcMethod& method, ClientContext* context, @@ -115,6 +119,16 @@ class ChannelInterface { CompletionQueue* cq, void* tag) = 0; virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline) = 0; + + // EXPERIMENTAL + // A method to get the callbackable completion queue associated with this + // channel. If the return value is nullptr, this channel doesn't support + // callback operations. + // TODO(vjpai): Consider a better default like using a global CQ + // Returns nullptr (rather than being pure) since this is a new method + // and adding a new pure method to an interface would be a breaking change + // (even though this is private and non-API) + virtual CompletionQueue* CallbackCQ() { return nullptr; } }; } // namespace grpc diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h new file mode 100644 index 0000000000..4d4faea063 --- /dev/null +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -0,0 +1,95 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H +#define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H + +#include <functional> + +#include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/callback_common.h> +#include <grpcpp/impl/codegen/channel_interface.h> +#include <grpcpp/impl/codegen/config.h> +#include <grpcpp/impl/codegen/core_codegen_interface.h> +#include <grpcpp/impl/codegen/status.h> + +namespace grpc { + +class Channel; +class ClientContext; +class CompletionQueue; + +namespace internal { +class RpcMethod; + +/// Perform a callback-based unary call +/// TODO(vjpai): Combine as much as possible with the blocking unary call code +template <class InputMessage, class OutputMessage> +void CallbackUnaryCall(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const InputMessage* request, + OutputMessage* result, + std::function<void(Status)> on_completion) { + CallbackUnaryCallImpl<InputMessage, OutputMessage> x( + channel, method, context, request, result, on_completion); +} + +template <class InputMessage, class OutputMessage> +class CallbackUnaryCallImpl { + public: + CallbackUnaryCallImpl(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const InputMessage* request, + OutputMessage* result, + std::function<void(Status)> on_completion) { + CompletionQueue* cq = channel->CallbackCQ(); + GPR_CODEGEN_ASSERT(cq != nullptr); + Call call(channel->CreateCall(method, context, cq)); + + using FullCallOpSet = + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>, + CallOpClientSendClose, CallOpClientRecvStatus>; + + auto* ops = new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(FullCallOpSet))) FullCallOpSet; + + auto* tag = new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(CallbackWithStatusTag))) + CallbackWithStatusTag(call.call(), on_completion, ops); + + // TODO(vjpai): Unify code with sync API as much as possible + Status s = ops->SendMessage(*request); + if (!s.ok()) { + tag->force_run(s); + return; + } + ops->SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + ops->RecvInitialMetadata(context); + ops->RecvMessage(result); + ops->AllowNoMessage(); + ops->ClientSendClose(); + ops->ClientRecvStatus(context, tag->status_ptr()); + ops->set_cq_tag(tag); + call.PerformOps(ops); + } +}; + +} // namespace internal +} // namespace grpc + +#endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H diff --git a/include/grpcpp/impl/codegen/client_context.h b/include/grpcpp/impl/codegen/client_context.h index 9dda4c7fac..46635a541a 100644 --- a/include/grpcpp/impl/codegen/client_context.h +++ b/include/grpcpp/impl/codegen/client_context.h @@ -68,6 +68,8 @@ class CallOpClientRecvStatus; class CallOpRecvInitialMetadata; template <class InputMessage, class OutputMessage> class BlockingUnaryCallImpl; +template <class InputMessage, class OutputMessage> +class CallbackUnaryCallImpl; } // namespace internal template <class R> @@ -389,6 +391,8 @@ class ClientContext { friend class ::grpc::ClientAsyncResponseReader; template <class InputMessage, class OutputMessage> friend class ::grpc::internal::BlockingUnaryCallImpl; + template <class InputMessage, class OutputMessage> + friend class ::grpc::internal::CallbackUnaryCallImpl; // Used by friend class CallOpClientRecvStatus void set_debug_error_string(const grpc::string& debug_error_string) { @@ -425,8 +429,8 @@ class ClientContext { mutable std::shared_ptr<const AuthContext> auth_context_; struct census_context* census_context_; std::multimap<grpc::string, grpc::string> send_initial_metadata_; - internal::MetadataMap recv_initial_metadata_; - internal::MetadataMap trailing_metadata_; + mutable internal::MetadataMap recv_initial_metadata_; + mutable internal::MetadataMap trailing_metadata_; grpc_call* propagate_from_call_; PropagationOptions propagation_options_; diff --git a/include/grpcpp/impl/codegen/client_unary_call.h b/include/grpcpp/impl/codegen/client_unary_call.h index a37a81b75b..e4e8364e07 100644 --- a/include/grpcpp/impl/codegen/client_unary_call.h +++ b/include/grpcpp/impl/codegen/client_unary_call.h @@ -50,8 +50,8 @@ class BlockingUnaryCallImpl { ClientContext* context, const InputMessage& request, OutputMessage* result) { CompletionQueue cq(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, - GRPC_CQ_DEFAULT_POLLING}); // Pluckable completion queue + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, + nullptr}); // Pluckable completion queue Call call(channel->CreateCall(method, context, &cq)); CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>, diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h index 9713333cf5..f52f9a53be 100644 --- a/include/grpcpp/impl/codegen/completion_queue.h +++ b/include/grpcpp/impl/codegen/completion_queue.h @@ -78,9 +78,10 @@ template <class ServiceType, class RequestType, class ResponseType> class ServerStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> class BidiStreamingHandler; -class UnknownMethodHandler; template <class Streamer, bool WriteNeeded> class TemplatedBidiStreamingHandler; +template <StatusCode code> +class ErrorMethodHandler; template <class InputMessage, class OutputMessage> class BlockingUnaryCallImpl; } // namespace internal @@ -97,7 +98,8 @@ class CompletionQueue : private GrpcLibraryCodegen { /// instance. CompletionQueue() : CompletionQueue(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING}) {} + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, + nullptr}) {} /// Wrap \a take, taking ownership of the instance. /// @@ -264,13 +266,17 @@ class CompletionQueue : private GrpcLibraryCodegen { friend class ::grpc::internal::ServerStreamingHandler; template <class Streamer, bool WriteNeeded> friend class ::grpc::internal::TemplatedBidiStreamingHandler; - friend class ::grpc::internal::UnknownMethodHandler; + template <StatusCode code> + friend class ::grpc::internal::ErrorMethodHandler; friend class ::grpc::Server; friend class ::grpc::ServerContext; friend class ::grpc::ServerInterface; template <class InputMessage, class OutputMessage> friend class ::grpc::internal::BlockingUnaryCallImpl; + // Friends that need access to constructor for callback CQ + friend class ::grpc::Channel; + /// EXPERIMENTAL /// Creates a Thread Local cache to store the first event /// On this completion queue queued from this thread. Once @@ -367,7 +373,7 @@ class ServerCompletionQueue : public CompletionQueue { protected: /// Default constructor - ServerCompletionQueue() {} + ServerCompletionQueue() : polling_type_(GRPC_CQ_DEFAULT_POLLING) {} private: /// \param is_frequently_polled Informs the GRPC library about whether the @@ -376,7 +382,7 @@ class ServerCompletionQueue : public CompletionQueue { /// frequently polled. ServerCompletionQueue(grpc_cq_polling_type polling_type) : CompletionQueue(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, polling_type}), + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, polling_type, nullptr}), polling_type_(polling_type) {} grpc_cq_polling_type polling_type_; diff --git a/include/grpcpp/impl/codegen/completion_queue_tag.h b/include/grpcpp/impl/codegen/completion_queue_tag.h index ffb642c56b..304386a9ec 100644 --- a/include/grpcpp/impl/codegen/completion_queue_tag.h +++ b/include/grpcpp/impl/codegen/completion_queue_tag.h @@ -26,10 +26,25 @@ namespace internal { class CompletionQueueTag { public: virtual ~CompletionQueueTag() {} - /// Called prior to returning from Next(), return value is the status of the - /// operation (return status is the default thing to do). If this function - /// returns false, the tag is dropped and not returned from the completion - /// queue + + /// FinalizeResult must be called before informing user code that the + /// operation bound to the underlying core completion queue tag has + /// completed. In practice, this means: + /// + /// 1. For the sync API - before returning from Pluck + /// 2. For the CQ-based async API - before returning from Next + /// 3. For the callback-based API - before invoking the user callback + /// + /// This is the method that translates from core-side tag/status to + /// C++ API-observable tag/status. + /// + /// The return value is the status of the operation (returning status is the + /// general behavior of this function). If this function returns false, the + /// tag is dropped and not returned from the completion queue: this concept is + /// for events that are observed at core but not requested by the user + /// application (e.g., server shutdown, for server unimplemented method + /// responses, or for cases where a server-side RPC doesn't have a completion + /// notification registered using AsyncNotifyWhenDone) virtual bool FinalizeResult(void** tag, bool* status) = 0; }; } // namespace internal diff --git a/include/grpcpp/impl/codegen/metadata_map.h b/include/grpcpp/impl/codegen/metadata_map.h index 0866539d88..5e062a50f8 100644 --- a/include/grpcpp/impl/codegen/metadata_map.h +++ b/include/grpcpp/impl/codegen/metadata_map.h @@ -19,11 +19,15 @@ #ifndef GRPCPP_IMPL_CODEGEN_METADATA_MAP_H #define GRPCPP_IMPL_CODEGEN_METADATA_MAP_H +#include <grpc/impl/codegen/log.h> #include <grpcpp/impl/codegen/slice.h> namespace grpc { namespace internal { + +const char kBinaryErrorDetailsKey[] = "grpc-status-details-bin"; + class MetadataMap { public: MetadataMap() { memset(&arr_, 0, sizeof(arr_)); } @@ -32,24 +36,54 @@ class MetadataMap { g_core_codegen_interface->grpc_metadata_array_destroy(&arr_); } - void FillMap() { - for (size_t i = 0; i < arr_.count; i++) { - // TODO(yangg) handle duplicates? - map_.insert(std::pair<grpc::string_ref, grpc::string_ref>( - StringRefFromSlice(&arr_.metadata[i].key), - StringRefFromSlice(&arr_.metadata[i].value))); + grpc::string GetBinaryErrorDetails() { + // if filled_, extract from the multimap for O(log(n)) + if (filled_) { + auto iter = map_.find(kBinaryErrorDetailsKey); + if (iter != map_.end()) { + return grpc::string(iter->second.begin(), iter->second.length()); + } + } + // if not yet filled, take the O(n) lookup to avoid allocating the + // multimap until it is requested. + // TODO(ncteisen): plumb this through core as a first class object, just + // like code and message. + else { + for (size_t i = 0; i < arr_.count; i++) { + if (strncmp(reinterpret_cast<const char*>( + GRPC_SLICE_START_PTR(arr_.metadata[i].key)), + kBinaryErrorDetailsKey, + GRPC_SLICE_LENGTH(arr_.metadata[i].key)) == 0) { + return grpc::string(reinterpret_cast<const char*>( + GRPC_SLICE_START_PTR(arr_.metadata[i].value)), + GRPC_SLICE_LENGTH(arr_.metadata[i].value)); + } + } } + return grpc::string(); } - std::multimap<grpc::string_ref, grpc::string_ref>* map() { return &map_; } - const std::multimap<grpc::string_ref, grpc::string_ref>* map() const { + std::multimap<grpc::string_ref, grpc::string_ref>* map() { + FillMap(); return &map_; } grpc_metadata_array* arr() { return &arr_; } private: + bool filled_ = false; grpc_metadata_array arr_; std::multimap<grpc::string_ref, grpc::string_ref> map_; + + void FillMap() { + if (filled_) return; + filled_ = true; + for (size_t i = 0; i < arr_.count; i++) { + // TODO(yangg) handle duplicates? + map_.insert(std::pair<grpc::string_ref, grpc::string_ref>( + StringRefFromSlice(&arr_.metadata[i].key), + StringRefFromSlice(&arr_.metadata[i].value))); + } + } }; } // namespace internal diff --git a/include/grpcpp/impl/codegen/method_handler_impl.h b/include/grpcpp/impl/codegen/method_handler_impl.h index 27552d79df..53117f941b 100644 --- a/include/grpcpp/impl/codegen/method_handler_impl.h +++ b/include/grpcpp/impl/codegen/method_handler_impl.h @@ -113,14 +113,15 @@ class ClientStreamingHandler : public MethodHandler { return func_(service_, param.server_context, &reader, &rsp); }); - GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_); CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> ops; - ops.SendInitialMetadata(param.server_context->initial_metadata_, - param.server_context->initial_metadata_flags()); - if (param.server_context->compression_level_set()) { - ops.set_compression_level(param.server_context->compression_level()); + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(param.server_context->initial_metadata_, + param.server_context->initial_metadata_flags()); + if (param.server_context->compression_level_set()) { + ops.set_compression_level(param.server_context->compression_level()); + } } if (status.ok()) { status = ops.SendMessage(rsp); @@ -271,12 +272,14 @@ class SplitServerStreamingHandler ServerSplitStreamer<RequestType, ResponseType>, false>(func) {} }; -/// Handle unknown method by returning UNIMPLEMENTED error. -class UnknownMethodHandler : public MethodHandler { +/// General method handler class for errors that prevent real method use +/// e.g., handle unknown method by returning UNIMPLEMENTED error. +template <StatusCode code> +class ErrorMethodHandler : public MethodHandler { public: template <class T> static void FillOps(ServerContext* context, T* ops) { - Status status(StatusCode::UNIMPLEMENTED, ""); + Status status(code, ""); if (!context->sent_initial_metadata_) { ops->SendInitialMetadata(context->initial_metadata_, context->initial_metadata_flags()); @@ -293,9 +296,18 @@ class UnknownMethodHandler : public MethodHandler { FillOps(param.server_context, &ops); param.call->PerformOps(&ops); param.call->cq()->Pluck(&ops); + // We also have to destroy any request payload in the handler parameter + ByteBuffer* payload = param.request.bbuf_ptr(); + if (payload != nullptr) { + payload->Clear(); + } } }; +typedef ErrorMethodHandler<StatusCode::UNIMPLEMENTED> UnknownMethodHandler; +typedef ErrorMethodHandler<StatusCode::RESOURCE_EXHAUSTED> + ResourceExhaustedHandler; + } // namespace internal } // namespace grpc diff --git a/include/grpcpp/impl/codegen/rpc_service_method.h b/include/grpcpp/impl/codegen/rpc_service_method.h index dd85405e7a..5cf88e216f 100644 --- a/include/grpcpp/impl/codegen/rpc_service_method.h +++ b/include/grpcpp/impl/codegen/rpc_service_method.h @@ -25,6 +25,7 @@ #include <memory> #include <vector> +#include <grpc/impl/codegen/log.h> #include <grpcpp/impl/codegen/byte_buffer.h> #include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/rpc_method.h> @@ -58,18 +59,57 @@ class RpcServiceMethod : public RpcMethod { /// Takes ownership of the handler RpcServiceMethod(const char* name, RpcMethod::RpcType type, MethodHandler* handler) - : RpcMethod(name, type), server_tag_(nullptr), handler_(handler) {} + : RpcMethod(name, type), + server_tag_(nullptr), + async_type_(AsyncType::UNSET), + handler_(handler) {} + + enum class AsyncType { + UNSET, + ASYNC, + RAW, + }; void set_server_tag(void* tag) { server_tag_ = tag; } void* server_tag() const { return server_tag_; } /// if MethodHandler is nullptr, then this is an async method MethodHandler* handler() const { return handler_.get(); } - void ResetHandler() { handler_.reset(); } void SetHandler(MethodHandler* handler) { handler_.reset(handler); } + void SetServerAsyncType(RpcServiceMethod::AsyncType type) { + if (async_type_ == AsyncType::UNSET) { + // this marks this method as async + handler_.reset(); + } else { + // this is not an error condition, as it allows users to declare a server + // like WithRawMethod_foo<AsyncService>. However since it + // overwrites behavior, it should be logged. + gpr_log( + GPR_INFO, + "You are marking method %s as '%s', even though it was " + "previously marked '%s'. This behavior will overwrite the original " + "behavior. If you expected this then ignore this message.", + name(), TypeToString(async_type_), TypeToString(type)); + } + async_type_ = type; + } private: void* server_tag_; + AsyncType async_type_; std::unique_ptr<MethodHandler> handler_; + + const char* TypeToString(RpcServiceMethod::AsyncType type) { + switch (type) { + case AsyncType::UNSET: + return "unset"; + case AsyncType::ASYNC: + return "async"; + case AsyncType::RAW: + return "raw"; + default: + GPR_UNREACHABLE_CODE(return "unknown"); + } + } }; } // namespace internal diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h index bced4202dd..b58f029de9 100644 --- a/include/grpcpp/impl/codegen/server_context.h +++ b/include/grpcpp/impl/codegen/server_context.h @@ -63,9 +63,10 @@ template <class ServiceType, class RequestType, class ResponseType> class ServerStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> class BidiStreamingHandler; -class UnknownMethodHandler; template <class Streamer, bool WriteNeeded> class TemplatedBidiStreamingHandler; +template <StatusCode code> +class ErrorMethodHandler; class Call; } // namespace internal @@ -201,7 +202,7 @@ class ServerContext { /// \param algorithm The compression algorithm used for the server call. void set_compression_algorithm(grpc_compression_algorithm algorithm); - /// Set the load reporting costs in \a cost_data for the call. + /// Set the serialized load reporting costs in \a cost_data for the call. void SetLoadReportingCosts(const std::vector<grpc::string>& cost_data); /// Return the authentication context for this server call. @@ -226,6 +227,8 @@ class ServerContext { /// Async only. Has to be called before the rpc starts. /// Returns the tag in completion queue when the rpc finishes. /// IsCancelled() can then be called to check whether the rpc was cancelled. + /// TODO(vjpai): Fix this so that the tag is returned even if the call never + /// starts (https://github.com/grpc/grpc/issues/10136). void AsyncNotifyWhenDone(void* tag) { has_notify_when_done_tag_ = true; async_notify_when_done_tag_ = tag; @@ -262,7 +265,8 @@ class ServerContext { friend class ::grpc::internal::ServerStreamingHandler; template <class Streamer, bool WriteNeeded> friend class ::grpc::internal::TemplatedBidiStreamingHandler; - friend class ::grpc::internal::UnknownMethodHandler; + template <StatusCode code> + friend class internal::ErrorMethodHandler; friend class ::grpc::ClientContext; /// Prevent copying. @@ -290,7 +294,7 @@ class ServerContext { CompletionQueue* cq_; bool sent_initial_metadata_; mutable std::shared_ptr<const AuthContext> auth_context_; - internal::MetadataMap client_metadata_; + mutable internal::MetadataMap client_metadata_; std::multimap<grpc::string, grpc::string> initial_metadata_; std::multimap<grpc::string, grpc::string> trailing_metadata_; diff --git a/include/grpcpp/impl/codegen/server_interface.h b/include/grpcpp/impl/codegen/server_interface.h index cf330ac352..237991cde6 100644 --- a/include/grpcpp/impl/codegen/server_interface.h +++ b/include/grpcpp/impl/codegen/server_interface.h @@ -49,12 +49,35 @@ class ServerInterface : public internal::CallHook { public: virtual ~ServerInterface() {} - /// Shutdown the server, blocking until all rpc processing finishes. - /// Forcefully terminate pending calls after \a deadline expires. + /// \a Shutdown does the following things: + /// + /// 1. Shutdown the server: deactivate all listening ports, mark it in + /// "shutdown mode" so that further call Request's or incoming RPC matches + /// are no longer allowed. Also return all Request'ed-but-not-yet-active + /// calls as failed (!ok). This refers to calls that have been requested + /// at the server by the server-side library or application code but that + /// have not yet been matched to incoming RPCs from the client. Note that + /// this would even include default calls added automatically by the gRPC + /// C++ API without the user's input (e.g., "Unimplemented RPC method") + /// + /// 2. Block until all rpc method handlers invoked automatically by the sync + /// API finish. + /// + /// 3. If all pending calls complete (and all their operations are + /// retrieved by Next) before \a deadline expires, this finishes + /// gracefully. Otherwise, forcefully cancel all pending calls associated + /// with the server after \a deadline expires. In the case of the sync API, + /// if the RPC function for a streaming call has already been started and + /// takes a week to complete, the RPC function won't be forcefully + /// terminated (since that would leave state corrupt and incomplete) and + /// the method handler will just keep running (which will prevent the + /// server from completing the "join" operation that it needs to do at + /// shutdown time). /// /// All completion queue associated with the server (for example, for async /// serving) must be shutdown *after* this method has returned: /// See \a ServerBuilder::AddCompletionQueue for details. + /// They must also be drained (by repeated Next) after being shutdown. /// /// \param deadline How long to wait until pending rpcs are forcefully /// terminated. @@ -63,7 +86,7 @@ class ServerInterface : public internal::CallHook { ShutdownInternal(TimePoint<T>(deadline).raw_time()); } - /// Shutdown the server, waiting for all rpc processing to finish. + /// Shutdown the server without a deadline and forced cancellation. /// /// All completion queue associated with the server (for example, for async /// serving) must be shutdown *after* this method has returned: diff --git a/include/grpcpp/impl/codegen/service_type.h b/include/grpcpp/impl/codegen/service_type.h index a576f66911..9f1a052168 100644 --- a/include/grpcpp/impl/codegen/service_type.h +++ b/include/grpcpp/impl/codegen/service_type.h @@ -93,14 +93,19 @@ class Service { internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, + // Typecast the index to size_t for indexing into a vector + // while preserving the API that existed before a compiler + // warning was first seen (grpc/grpc#11664) + size_t idx = static_cast<size_t>(index); + server_->RequestAsyncCall(methods_[idx].get(), context, stream, call_cq, notification_cq, tag, request); } void RequestAsyncClientStreaming( int index, ServerContext* context, internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, + size_t idx = static_cast<size_t>(index); + server_->RequestAsyncCall(methods_[idx].get(), context, stream, call_cq, notification_cq, tag); } template <class Message> @@ -108,14 +113,16 @@ class Service { int index, ServerContext* context, Message* request, internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, + size_t idx = static_cast<size_t>(index); + server_->RequestAsyncCall(methods_[idx].get(), context, stream, call_cq, notification_cq, tag, request); } void RequestAsyncBidiStreaming( int index, ServerContext* context, internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, + size_t idx = static_cast<size_t>(index); + server_->RequestAsyncCall(methods_[idx].get(), context, stream, call_cq, notification_cq, tag); } @@ -124,31 +131,52 @@ class Service { } void MarkMethodAsync(int index) { + // This does not have to be a hard error, however no one has approached us + // with a use case yet. Please file an issue if you believe you have one. + size_t idx = static_cast<size_t>(index); GPR_CODEGEN_ASSERT( - methods_[index].get() != nullptr && + methods_[idx].get() != nullptr && "Cannot mark the method as 'async' because it has already been " "marked as 'generic'."); - methods_[index]->ResetHandler(); + methods_[idx]->SetServerAsyncType( + internal::RpcServiceMethod::AsyncType::ASYNC); + } + + void MarkMethodRaw(int index) { + // This does not have to be a hard error, however no one has approached us + // with a use case yet. Please file an issue if you believe you have one. + size_t idx = static_cast<size_t>(index); + GPR_CODEGEN_ASSERT(methods_[idx].get() != nullptr && + "Cannot mark the method as 'raw' because it has already " + "been marked as 'generic'."); + methods_[idx]->SetServerAsyncType( + internal::RpcServiceMethod::AsyncType::RAW); } void MarkMethodGeneric(int index) { + // This does not have to be a hard error, however no one has approached us + // with a use case yet. Please file an issue if you believe you have one. + size_t idx = static_cast<size_t>(index); GPR_CODEGEN_ASSERT( - methods_[index]->handler() != nullptr && + methods_[idx]->handler() != nullptr && "Cannot mark the method as 'generic' because it has already been " - "marked as 'async'."); - methods_[index].reset(); + "marked as 'async' or 'raw'."); + methods_[idx].reset(); } void MarkMethodStreamed(int index, internal::MethodHandler* streamed_method) { - GPR_CODEGEN_ASSERT(methods_[index] && methods_[index]->handler() && + // This does not have to be a hard error, however no one has approached us + // with a use case yet. Please file an issue if you believe you have one. + size_t idx = static_cast<size_t>(index); + GPR_CODEGEN_ASSERT(methods_[idx] && methods_[idx]->handler() && "Cannot mark an async or generic method Streamed"); - methods_[index]->SetHandler(streamed_method); + methods_[idx]->SetHandler(streamed_method); // From the server's point of view, streamed unary is a special // case of BIDI_STREAMING that has 1 read and 1 write, in that order, // and split server-side streaming is BIDI_STREAMING with 1 read and // any number of writes, in that order. - methods_[index]->SetMethodType(internal::RpcMethod::BIDI_STREAMING); + methods_[idx]->SetMethodType(internal::RpcMethod::BIDI_STREAMING); } private: diff --git a/include/grpcpp/impl/codegen/sync_stream.h b/include/grpcpp/impl/codegen/sync_stream.h index 7152eaf41f..cbfcf25d0a 100644 --- a/include/grpcpp/impl/codegen/sync_stream.h +++ b/include/grpcpp/impl/codegen/sync_stream.h @@ -243,8 +243,8 @@ class ClientReader final : public ClientReaderInterface<R> { ClientContext* context, const W& request) : context_(context), cq_(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, - GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, + nullptr}), // Pluckable cq call_(channel->CreateCall(method, context, &cq_)) { ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, ::grpc::internal::CallOpSendMessage, @@ -377,8 +377,8 @@ class ClientWriter : public ClientWriterInterface<W> { ClientContext* context, R* response) : context_(context), cq_(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, - GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, + nullptr}), // Pluckable cq call_(channel->CreateCall(method, context, &cq_)) { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); @@ -551,8 +551,8 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { ClientContext* context) : context_(context), cq_(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, - GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, + nullptr}), // Pluckable cq call_(channel->CreateCall(method, context, &cq_)) { if (!context_->initial_metadata_corked_) { ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> diff --git a/include/grpcpp/impl/server_builder_plugin.h b/include/grpcpp/impl/server_builder_plugin.h index d73511e465..39450b42d5 100644 --- a/include/grpcpp/impl/server_builder_plugin.h +++ b/include/grpcpp/impl/server_builder_plugin.h @@ -37,8 +37,9 @@ class ServerBuilderPlugin { virtual ~ServerBuilderPlugin() {} virtual grpc::string name() = 0; - /// UpdateServerBuilder will be called at the beginning of - /// \a ServerBuilder::BuildAndStart(). + /// UpdateServerBuilder will be called at an early stage in + /// ServerBuilder::BuildAndStart(), right after the ServerBuilderOptions have + /// done their updates. virtual void UpdateServerBuilder(ServerBuilder* builder) {} /// InitServer will be called in ServerBuilder::BuildAndStart(), after the |